Skip to content

Commit

Permalink
chore: refresh node registry before commands
Browse files Browse the repository at this point in the history
The node registry is updated to the best of our ability before we run the `start`, `stop`, `remove`
or `upgrade` commands.

Here is an example of why this should apply. If we start a service, the node registry now stores the
PID for the process that is started by the service. It's possible for a user to kill the process
outside of both the node manager and the OS service infrastructure. However, our service definitions
have a restart policy, meaning when the user kills the process, the service infrastructure will
start another one. In this scenario, the node registry now has a PID for a dead process. So if we
then use the node manager to try and stop the service, it determines there is no process with the
dead PID, and declares the service has already been stopped.

The solution is to refresh the registry and get the most up-to-date status before running any of the
control commands.
  • Loading branch information
jacderida authored and RolandSherwin committed Mar 26, 2024
1 parent 6cab37f commit 9fc56fb
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 47 deletions.
10 changes: 9 additions & 1 deletion sn_node_manager/src/cmd/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
},
config,
helpers::{download_and_extract_release, get_bin_version},
status_report, ServiceManager, VerbosityLevel,
refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
};
use color_eyre::{eyre::eyre, Result};
use colored::Colorize;
Expand Down Expand Up @@ -122,6 +122,8 @@ pub async fn remove(
println!("=================================================");

let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
refresh_node_registry(&mut node_registry, &ServiceController {}, true).await?;

let service_indices = get_services_to_update(&node_registry, peer_ids, service_names)?;
if service_indices.is_empty() {
// This could be the case if all services are at `Removed` status.
Expand Down Expand Up @@ -164,6 +166,8 @@ pub async fn start(
}

let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
refresh_node_registry(&mut node_registry, &ServiceController {}, true).await?;

let service_indices = get_services_to_update(&node_registry, peer_ids, service_names)?;
if service_indices.is_empty() {
// This could be the case if all services are at `Removed` status.
Expand Down Expand Up @@ -246,6 +250,8 @@ pub async fn stop(
}

let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
refresh_node_registry(&mut node_registry, &ServiceController {}, true).await?;

let service_indices = get_services_to_update(&node_registry, peer_ids, service_names)?;
if service_indices.is_empty() {
// This could be the case if all services are at `Removed` status.
Expand Down Expand Up @@ -295,6 +301,8 @@ pub async fn upgrade(
download_and_get_upgrade_bin_path(ReleaseType::Safenode, url, version).await?;

let mut node_registry = NodeRegistry::load(&config::get_node_registry_path()?)?;
refresh_node_registry(&mut node_registry, &ServiceController {}, true).await?;

if !force {
let node_versions = node_registry
.nodes
Expand Down
102 changes: 57 additions & 45 deletions sn_node_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,51 +261,7 @@ pub async fn status_report(
output_json: bool,
fail: bool,
) -> Result<()> {
// Again confirm that services which are marked running are still actually running.
// If they aren't we'll mark them as stopped.
for node in &mut node_registry.nodes {
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
if let ServiceStatus::Running = node.status {
if let Some(pid) = node.pid {
// First we can try the PID we have now. If there is still a process running with
// that PID, we know the node is still running.
if service_control.is_service_process_running(pid) {
match rpc_client.network_info().await {
Ok(info) => {
node.connected_peers = Some(info.connected_peers);
}
Err(_) => {
node.connected_peers = None;
}
}
} else {
// The process with the PID we had has died at some point. However, if the
// service has been configured to restart on failures, it's possible that a new
// process has been launched and hence we would have a new PID. We can use the
// RPC service to try and retrieve it.
match rpc_client.node_info().await {
Ok(info) => {
node.pid = Some(info.pid);
}
Err(_) => {
// Finally, if there was an error communicating with the RPC client, we
// can assume that this node is actually stopped.
node.status = ServiceStatus::Stopped;
node.pid = None;
}
}
match rpc_client.network_info().await {
Ok(info) => {
node.connected_peers = Some(info.connected_peers);
}
Err(_) => {
node.connected_peers = None;
}
}
}
}
}
}
refresh_node_registry(node_registry, service_control, !output_json).await?;

if output_json {
let json = serde_json::to_string_pretty(&node_registry.to_status_summary())?;
Expand Down Expand Up @@ -404,6 +360,62 @@ pub async fn status_report(
Ok(())
}

pub async fn refresh_node_registry(
node_registry: &mut NodeRegistry,
service_control: &dyn ServiceControl,
print_refresh_message: bool,
) -> Result<()> {
// This message is useful for users, but needs to be suppressed when a JSON output is requested.
if print_refresh_message {
println!("Refreshing the node registry...");
}

for node in &mut node_registry.nodes {
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
if let ServiceStatus::Running = node.status {
if let Some(pid) = node.pid {
// First we can try the PID we have now. If there is still a process running with
// that PID, we know the node is still running.
if service_control.is_service_process_running(pid) {
match rpc_client.network_info().await {
Ok(info) => {
node.connected_peers = Some(info.connected_peers);
}
Err(_) => {
node.connected_peers = None;
}
}
} else {
// The process with the PID we had has died at some point. However, if the
// service has been configured to restart on failures, it's possible that a new
// process has been launched and hence we would have a new PID. We can use the
// RPC service to try and retrieve it.
match rpc_client.node_info().await {
Ok(info) => {
node.pid = Some(info.pid);
}
Err(_) => {
// Finally, if there was an error communicating with the RPC client, we
// can assume that this node is actually stopped.
node.status = ServiceStatus::Stopped;
node.pid = None;
}
}
match rpc_client.network_info().await {
Ok(info) => {
node.connected_peers = Some(info.connected_peers);
}
Err(_) => {
node.connected_peers = None;
}
}
}
}
}
}
Ok(())
}

fn format_status(status: &ServiceStatus) -> String {
match status {
ServiceStatus::Running => "RUNNING".green().to_string(),
Expand Down
2 changes: 1 addition & 1 deletion sn_node_manager/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ fn parse_service_status(output: &[u8]) -> Vec<ServiceStatus> {
let output_str = String::from_utf8_lossy(output);
output_str
.split('\n')
.skip(4) // Skip header lines
.skip(5) // Skip header lines
.filter(|line| !line.is_empty())
.map(|line| {
let columns: Vec<&str> = line.split_whitespace().collect();
Expand Down

0 comments on commit 9fc56fb

Please sign in to comment.