From 26c5eeece6afce53936a4041050f7f2e165c7b50 Mon Sep 17 00:00:00 2001 From: Josh Wilson Date: Tue, 15 Oct 2024 14:45:58 +0900 Subject: [PATCH 1/2] feat(node): kill nodes running on host with little spare CPU uses node control flow to exit a node if we consistently detect high CPU --- Cargo.lock | 1 + sn_node/Cargo.toml | 1 + sn_node/src/bin/safenode/main.rs | 53 ++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index f5267b6c79..8ea74c2bc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8256,6 +8256,7 @@ dependencies = [ "sn_service_management", "sn_transfers", "strum", + "sysinfo", "tempfile", "test_utils", "thiserror", diff --git a/sn_node/Cargo.toml b/sn_node/Cargo.toml index 46a90789d6..2df87dcb0e 100644 --- a/sn_node/Cargo.toml +++ b/sn_node/Cargo.toml @@ -60,6 +60,7 @@ sn_registers = { path = "../sn_registers", version = "0.3.21" } sn_transfers = { path = "../sn_transfers", version = "0.19.3" } sn_service_management = { path = "../sn_service_management", version = "0.3.14" } sn_evm = { path = "../sn_evm", version = "0.1" } +sysinfo = { version = "0.30.8", default-features = false } thiserror = "1.0.23" tokio = { version = "1.32.0", features = [ "io-util", diff --git a/sn_node/src/bin/safenode/main.rs b/sn_node/src/bin/safenode/main.rs index 802c6696a8..487eec9a69 100644 --- a/sn_node/src/bin/safenode/main.rs +++ b/sn_node/src/bin/safenode/main.rs @@ -34,6 +34,7 @@ use std::{ process::Command, time::Duration, }; +use sysinfo::{self, System}; use tokio::{ runtime::Runtime, sync::{broadcast::error::RecvError, mpsc}, @@ -387,6 +388,58 @@ You can check your reward balance by running: error!("Failed to send node control msg to safenode bin main thread: {err}"); } }); + let ctrl_tx_clone_cpu = ctrl_tx.clone(); + // Monitor host CPU usage + tokio::spawn(async move { + use rand::{thread_rng, Rng}; + + const CPU_CHECK_INTERVAL: Duration = Duration::from_secs(60); + const CPU_USAGE_THRESHOLD: f32 = 50.0; + const HIGH_CPU_CONSECUTIVE_LIMIT: u8 = 5; + const NODE_STOP_DELAY: Duration = Duration::from_secs(1); + const INITIAL_DELAY_MIN_S: u64 = 10; + const INITIAL_DELAY_MAX_S: u64 = + HIGH_CPU_CONSECUTIVE_LIMIT as u64 * CPU_CHECK_INTERVAL.as_secs(); + const JITTER_MIN_S: u64 = 1; + const JITTER_MAX_S: u64 = 15; + + let mut sys = System::new_all(); + + let mut high_cpu_count: u8 = 0; + + // Random initial delay between 1 and 5 minutes + let initial_delay = + Duration::from_secs(thread_rng().gen_range(INITIAL_DELAY_MIN_S..=INITIAL_DELAY_MAX_S)); + tokio::time::sleep(initial_delay).await; + + loop { + sys.refresh_cpu(); + let cpu_usage = sys.global_cpu_info().cpu_usage(); + + if cpu_usage > CPU_USAGE_THRESHOLD { + high_cpu_count += 1; + } else { + high_cpu_count = 0; + } + + if high_cpu_count >= HIGH_CPU_CONSECUTIVE_LIMIT { + if let Err(err) = ctrl_tx_clone_cpu + .send(NodeCtrl::Stop { + delay: NODE_STOP_DELAY, + cause: eyre!("Excess host CPU detected for {HIGH_CPU_CONSECUTIVE_LIMIT} consecutive minutes!"), + }) + .await + { + error!("Failed to send node control msg to safenode bin main thread: {err}"); + } + break; + } + + // Add jitter to the interval + let jitter = Duration::from_secs(thread_rng().gen_range(JITTER_MIN_S..=JITTER_MAX_S)); + tokio::time::sleep(CPU_CHECK_INTERVAL + jitter).await; + } + }); // Start up gRPC interface if enabled by user if let Some(addr) = rpc { From 90a8f261098def9dd61cabd4beaa6d3cdee153bb Mon Sep 17 00:00:00 2001 From: Chris O'Neil Date: Fri, 18 Oct 2024 18:50:40 +0100 Subject: [PATCH 2/2] feat: shutting down on excess cpu does not return error The node RPC is changed such that the stop event allows shutting the node down with either a success or failure result. Then, in the case of the excessive CPU usage, the node will be shut down with a success result. This allows us to retain an on-failure restart policy for node-based services: with excess CPU usage, they would shut down with a success and so would not be started again. --- sn_node/src/bin/safenode/main.rs | 27 ++++++++++++++++--------- sn_node/src/bin/safenode/rpc_service.rs | 4 ++-- sn_protocol/src/node_rpc.rs | 8 +++++++- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/sn_node/src/bin/safenode/main.rs b/sn_node/src/bin/safenode/main.rs index 487eec9a69..dd328b14b2 100644 --- a/sn_node/src/bin/safenode/main.rs +++ b/sn_node/src/bin/safenode/main.rs @@ -24,7 +24,7 @@ use sn_logging::{Level, LogFormat, LogOutputDest, ReloadHandle}; use sn_node::{Marker, NodeBuilder, NodeEvent, NodeEventsReceiver}; use sn_peers_acquisition::PeersArgs; use sn_protocol::{ - node::get_safenode_root_dir, node_rpc::NodeCtrl, version::IDENTIFY_PROTOCOL_STR, + node::get_safenode_root_dir, node_rpc::{NodeCtrl, StopResult}, version::IDENTIFY_PROTOCOL_STR, }; use std::{ env, @@ -381,7 +381,7 @@ You can check your reward balance by running: if let Err(err) = ctrl_tx_clone .send(NodeCtrl::Stop { delay: Duration::from_secs(1), - cause: eyre!("Ctrl-C received!"), + result: StopResult::Error(eyre!("Ctrl-C received!")), }) .await { @@ -426,7 +426,7 @@ You can check your reward balance by running: if let Err(err) = ctrl_tx_clone_cpu .send(NodeCtrl::Stop { delay: NODE_STOP_DELAY, - cause: eyre!("Excess host CPU detected for {HIGH_CPU_CONSECUTIVE_LIMIT} consecutive minutes!"), + result: StopResult::Success(format!("Excess host CPU %{CPU_USAGE_THRESHOLD} detected for {HIGH_CPU_CONSECUTIVE_LIMIT} consecutive minutes!")), }) .await { @@ -475,12 +475,21 @@ You can check your reward balance by running: break Ok(res); } - Some(NodeCtrl::Stop { delay, cause }) => { + Some(NodeCtrl::Stop { delay, result }) => { let msg = format!("Node is stopping in {delay:?}..."); info!("{msg}"); println!("{msg} Node log path: {log_output_dest}"); sleep(delay).await; - return Err(cause); + match result { + StopResult::Success(message) => { + info!("Node stopped successfully: {}", message); + return Ok(None); + } + StopResult::Error(cause) => { + error!("Node stopped with error: {}", cause); + return Err(cause); + } + } } Some(NodeCtrl::Update(_delay)) => { // TODO: implement self-update once safenode app releases are published again @@ -503,7 +512,7 @@ fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Se if let Err(err) = ctrl_tx .send(NodeCtrl::Stop { delay: Duration::from_secs(1), - cause: eyre!("Node events channel closed!"), + result: StopResult::Error(eyre!("Node events channel closed!")), }) .await { @@ -517,13 +526,11 @@ fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Se if let Err(err) = ctrl_tx .send(NodeCtrl::Stop { delay: Duration::from_secs(1), - cause: eyre!("Node terminated due to: {reason:?}"), + result: StopResult::Error(eyre!("Node terminated due to: {reason:?}")), }) .await { - error!( - "Failed to send node control msg to safenode bin main thread: {err}" - ); + error!("Failed to send node control msg to safenode bin main thread: {err}"); break; } } diff --git a/sn_node/src/bin/safenode/rpc_service.rs b/sn_node/src/bin/safenode/rpc_service.rs index c42503f112..eef388b2d5 100644 --- a/sn_node/src/bin/safenode/rpc_service.rs +++ b/sn_node/src/bin/safenode/rpc_service.rs @@ -9,7 +9,7 @@ use eyre::{ErrReport, Result}; use sn_logging::ReloadHandle; use sn_node::RunningNode; -use sn_protocol::node_rpc::NodeCtrl; +use sn_protocol::node_rpc::{NodeCtrl, StopResult}; use sn_protocol::safenode_proto::{ k_buckets_response, safe_node_server::{SafeNode, SafeNodeServer}, @@ -202,7 +202,7 @@ impl SafeNode for SafeNodeRpcService { }; let delay = Duration::from_millis(request.get_ref().delay_millis); - match self.ctrl_tx.send(NodeCtrl::Stop { delay, cause }).await { + match self.ctrl_tx.send(NodeCtrl::Stop { delay, result: StopResult::Success(cause.to_string()) }).await { Ok(()) => Ok(Response::new(StopResponse {})), Err(err) => Err(Status::new( Code::Internal, diff --git a/sn_protocol/src/node_rpc.rs b/sn_protocol/src/node_rpc.rs index 599e874221..d35ddac5b4 100644 --- a/sn_protocol/src/node_rpc.rs +++ b/sn_protocol/src/node_rpc.rs @@ -15,7 +15,7 @@ pub enum NodeCtrl { /// Request to stop the execution of the safenode app, providing an error as a reason for it. Stop { delay: Duration, - cause: Error, + result: StopResult, }, /// Request to restart the execution of the safenode app, retrying to join the network, after the requested delay. /// Set `retain_peer_id` to `true` if you want to re-use the same root dir/secret keys/PeerId. @@ -26,3 +26,9 @@ pub enum NodeCtrl { // Request to update the safenode app, and restart it, after the requested delay. Update(Duration), } + +#[derive(Debug)] +pub enum StopResult { + Success(String), + Error(Error), +}