From c28ea66a551217489b23e69ae6a4bc06b6224765 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Thu, 14 Mar 2024 15:14:48 +0530 Subject: [PATCH] feat(protocol): add rpc to set node log level on the fly --- sn_node/src/bin/safenode/main.rs | 7 +++-- sn_node/src/bin/safenode/rpc_service.rs | 29 ++++++++++++++++++- sn_node_manager/src/lib.rs | 1 + sn_node_manager/src/local.rs | 1 + sn_node_rpc_client/src/main.rs | 19 ++++++++++++ .../src/safenode_proto/req_resp_types.proto | 6 ++++ sn_protocol/src/safenode_proto/safenode.proto | 3 ++ sn_service_management/src/rpc.rs | 17 ++++++++++- 8 files changed, 79 insertions(+), 4 deletions(-) diff --git a/sn_node/src/bin/safenode/main.rs b/sn_node/src/bin/safenode/main.rs index 7eb8aff2ac..a1e0a5d805 100644 --- a/sn_node/src/bin/safenode/main.rs +++ b/sn_node/src/bin/safenode/main.rs @@ -159,7 +159,7 @@ fn main() -> Result<()> { let node_socket_addr = SocketAddr::new(opt.ip, opt.port); let (root_dir, keypair) = get_root_dir_and_keypair(&opt.root_dir)?; - let (log_output_dest, _log_reload_handle, _log_appender_guard) = + let (log_output_dest, log_reload_handle, _log_appender_guard) = init_logging(&opt, keypair.public().to_peer_id())?; let rt = Runtime::new()?; @@ -191,7 +191,8 @@ fn main() -> Result<()> { let mut node_builder = node_builder; #[cfg(feature = "open-metrics")] node_builder.metrics_server_port(opt.metrics_server_port); - let restart_options = run_node(node_builder, opt.rpc, &log_output_dest).await?; + let restart_options = + run_node(node_builder, opt.rpc, &log_output_dest, log_reload_handle).await?; Ok::<_, eyre::Report>(restart_options) })?; @@ -215,6 +216,7 @@ async fn run_node( node_builder: NodeBuilder, rpc: Option, log_output_dest: &str, + log_reload_handle: ReloadHandle, ) -> Result> { let started_instant = std::time::Instant::now(); @@ -271,6 +273,7 @@ You can check your reward balance by running: running_node.clone(), ctrl_tx, started_instant, + log_reload_handle, ); } diff --git a/sn_node/src/bin/safenode/rpc_service.rs b/sn_node/src/bin/safenode/rpc_service.rs index 9fdf9eff7b..6884ff580b 100644 --- a/sn_node/src/bin/safenode/rpc_service.rs +++ b/sn_node/src/bin/safenode/rpc_service.rs @@ -9,6 +9,7 @@ use bls::{PublicKey, PK_SIZE}; use bytes::Bytes; use eyre::{ErrReport, Result}; +use sn_logging::ReloadHandle; use sn_node::RunningNode; use sn_protocol::node_rpc::NodeCtrl; use sn_protocol::safenode_proto::{ @@ -19,7 +20,8 @@ use sn_protocol::safenode_proto::{ KBucketsRequest, KBucketsResponse, NetworkInfoRequest, NetworkInfoResponse, NodeEvent, NodeEventsRequest, NodeInfoRequest, NodeInfoResponse, RecordAddressesRequest, RecordAddressesResponse, RestartRequest, RestartResponse, StopRequest, StopResponse, - TransferNotifsFilterRequest, TransferNotifsFilterResponse, UpdateRequest, UpdateResponse, + TransferNotifsFilterRequest, TransferNotifsFilterResponse, UpdateLogLevelRequest, + UpdateLogLevelResponse, UpdateRequest, UpdateResponse, }; use std::{ collections::HashMap, @@ -40,6 +42,7 @@ struct SafeNodeRpcService { running_node: RunningNode, ctrl_tx: Sender, started_instant: Instant, + log_reload_handle: ReloadHandle, } // Implementing RPC interface for service defined in .proto @@ -347,6 +350,28 @@ impl SafeNode for SafeNodeRpcService { )), } } + + async fn update_log_level( + &self, + request: Request, + ) -> Result, Status> { + debug!( + "RPC request received at {}: {:?}", + self.addr, + request.get_ref() + ); + + match self + .log_reload_handle + .modify_log_level(&request.get_ref().log_level) + { + Ok(()) => Ok(Response::new(UpdateLogLevelResponse {})), + Err(err) => Err(Status::new( + Code::Internal, + format!("Failed to update node's log level: {err:?}"), + )), + } + } } pub(crate) fn start_rpc_service( @@ -355,6 +380,7 @@ pub(crate) fn start_rpc_service( running_node: RunningNode, ctrl_tx: Sender, started_instant: Instant, + log_reload_handle: ReloadHandle, ) { // creating a service let service = SafeNodeRpcService { @@ -363,6 +389,7 @@ pub(crate) fn start_rpc_service( running_node, ctrl_tx, started_instant, + log_reload_handle, }; info!("RPC Server listening on {addr}"); println!("RPC Server listening on {addr}"); diff --git a/sn_node_manager/src/lib.rs b/sn_node_manager/src/lib.rs index e8bea9d8a6..5d151f33d9 100644 --- a/sn_node_manager/src/lib.rs +++ b/sn_node_manager/src/lib.rs @@ -437,6 +437,7 @@ mod tests { async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> ServiceControlResult<()>; async fn node_stop(&self, delay_millis: u64) -> ServiceControlResult<()>; async fn node_update(&self, delay_millis: u64) -> ServiceControlResult<()>; + async fn update_log_level(&self, log_levels: String) -> ServiceControlResult<()>; } } diff --git a/sn_node_manager/src/local.rs b/sn_node_manager/src/local.rs index 6b8f71e413..5c79e02b77 100644 --- a/sn_node_manager/src/local.rs +++ b/sn_node_manager/src/local.rs @@ -407,6 +407,7 @@ mod tests { async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>; async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>; async fn node_update(&self, delay_millis: u64) -> RpcResult<()>; + async fn update_log_level(&self, log_levels: String) -> RpcResult<()>; } } diff --git a/sn_node_rpc_client/src/main.rs b/sn_node_rpc_client/src/main.rs index 8988715d4d..ff16fb153f 100644 --- a/sn_node_rpc_client/src/main.rs +++ b/sn_node_rpc_client/src/main.rs @@ -110,6 +110,15 @@ enum Cmd { #[clap(default_value = "0")] delay_millis: u64, }, + /// Update the node's log levels. + #[clap(name = "log")] + Log { + /// Change the log level of the safenode. + /// + /// Example: --level SN_LOG=all,RUST_LOG=libp2p=debug + #[clap(name = "level", long)] + log_level: String, + }, } #[tokio::main] @@ -154,6 +163,7 @@ async fn main() -> Result<()> { } => node_restart(addr, delay_millis, retain_peer_id).await, Cmd::Stop { delay_millis } => node_stop(addr, delay_millis).await, Cmd::Update { delay_millis } => node_update(addr, delay_millis).await, + Cmd::Log { log_level } => update_log_level(addr, log_level).await, } } @@ -395,3 +405,12 @@ pub async fn node_update(addr: SocketAddr, delay_millis: u64) -> Result<()> { ); Ok(()) } + +pub async fn update_log_level(addr: SocketAddr, log_levels: String) -> Result<()> { + let endpoint = format!("https://{addr}"); + let client = RpcClient::new(&endpoint); + + client.update_log_level(log_levels.clone()).await?; + println!("Node successfully received the request to update the log level to {log_levels:?}",); + Ok(()) +} diff --git a/sn_protocol/src/safenode_proto/req_resp_types.proto b/sn_protocol/src/safenode_proto/req_resp_types.proto index 9a14fa5199..e58d78cde3 100644 --- a/sn_protocol/src/safenode_proto/req_resp_types.proto +++ b/sn_protocol/src/safenode_proto/req_resp_types.proto @@ -108,3 +108,9 @@ message UpdateRequest { message UpdateResponse {} +// Set the node's log level +message UpdateLogLevelRequest { + string log_level = 1; +} + +message UpdateLogLevelResponse{} \ No newline at end of file diff --git a/sn_protocol/src/safenode_proto/safenode.proto b/sn_protocol/src/safenode_proto/safenode.proto index b983436dd9..483ed66da5 100644 --- a/sn_protocol/src/safenode_proto/safenode.proto +++ b/sn_protocol/src/safenode_proto/safenode.proto @@ -57,4 +57,7 @@ service SafeNode { // Update the node rpc Update (UpdateRequest) returns (UpdateResponse); + + // Update the log level of the node + rpc UpdateLogLevel (UpdateLogLevelRequest) returns (UpdateLogLevelResponse); } diff --git a/sn_service_management/src/rpc.rs b/sn_service_management/src/rpc.rs index 58df409f3d..75b6dfd6ac 100644 --- a/sn_service_management/src/rpc.rs +++ b/sn_service_management/src/rpc.rs @@ -12,7 +12,7 @@ use libp2p::{kad::RecordKey, Multiaddr, PeerId}; use sn_protocol::safenode_proto::{ safe_node_client::SafeNodeClient, GossipsubPublishRequest, GossipsubSubscribeRequest, GossipsubUnsubscribeRequest, NetworkInfoRequest, NodeInfoRequest, RecordAddressesRequest, - RestartRequest, StopRequest, UpdateRequest, + RestartRequest, StopRequest, UpdateLogLevelRequest, UpdateRequest, }; use std::{net::SocketAddr, path::PathBuf, str::FromStr}; use tokio::time::Duration; @@ -51,6 +51,7 @@ pub trait RpcActions: Sync { async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> Result<()>; async fn node_stop(&self, delay_millis: u64) -> Result<()>; async fn node_update(&self, delay_millis: u64) -> Result<()>; + async fn update_log_level(&self, log_levels: String) -> Result<()>; } pub struct RpcClient { @@ -245,4 +246,18 @@ impl RpcActions for RpcClient { })?; Ok(()) } + + async fn update_log_level(&self, log_levels: String) -> Result<()> { + let mut client = self.connect_with_retry().await?; + let _response = client + .update_log_level(Request::new(UpdateLogLevelRequest { + log_level: log_levels, + })) + .await + .map_err(|e| { + error!("Could not update node through RPC: {e:?}"); + Error::RpcNodeUpdateError(e.to_string()) + })?; + Ok(()) + } }