Skip to content

Commit

Permalink
feat(protocol): add rpc to set node log level on the fly
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Mar 18, 2024
1 parent b62f016 commit c28ea66
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 4 deletions.
7 changes: 5 additions & 2 deletions sn_node/src/bin/safenode/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ fn main() -> Result<()> {
let node_socket_addr = SocketAddr::new(opt.ip, opt.port);
let (root_dir, keypair) = get_root_dir_and_keypair(&opt.root_dir)?;

let (log_output_dest, _log_reload_handle, _log_appender_guard) =
let (log_output_dest, log_reload_handle, _log_appender_guard) =
init_logging(&opt, keypair.public().to_peer_id())?;

let rt = Runtime::new()?;
Expand Down Expand Up @@ -191,7 +191,8 @@ fn main() -> Result<()> {
let mut node_builder = node_builder;
#[cfg(feature = "open-metrics")]
node_builder.metrics_server_port(opt.metrics_server_port);
let restart_options = run_node(node_builder, opt.rpc, &log_output_dest).await?;
let restart_options =
run_node(node_builder, opt.rpc, &log_output_dest, log_reload_handle).await?;

Ok::<_, eyre::Report>(restart_options)
})?;
Expand All @@ -215,6 +216,7 @@ async fn run_node(
node_builder: NodeBuilder,
rpc: Option<SocketAddr>,
log_output_dest: &str,
log_reload_handle: ReloadHandle,
) -> Result<Option<(PathBuf, u16)>> {
let started_instant = std::time::Instant::now();

Expand Down Expand Up @@ -271,6 +273,7 @@ You can check your reward balance by running:
running_node.clone(),
ctrl_tx,
started_instant,
log_reload_handle,
);
}

Expand Down
29 changes: 28 additions & 1 deletion sn_node/src/bin/safenode/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use bls::{PublicKey, PK_SIZE};
use bytes::Bytes;
use eyre::{ErrReport, Result};
use sn_logging::ReloadHandle;
use sn_node::RunningNode;
use sn_protocol::node_rpc::NodeCtrl;
use sn_protocol::safenode_proto::{
Expand All @@ -19,7 +20,8 @@ use sn_protocol::safenode_proto::{
KBucketsRequest, KBucketsResponse, NetworkInfoRequest, NetworkInfoResponse, NodeEvent,
NodeEventsRequest, NodeInfoRequest, NodeInfoResponse, RecordAddressesRequest,
RecordAddressesResponse, RestartRequest, RestartResponse, StopRequest, StopResponse,
TransferNotifsFilterRequest, TransferNotifsFilterResponse, UpdateRequest, UpdateResponse,
TransferNotifsFilterRequest, TransferNotifsFilterResponse, UpdateLogLevelRequest,
UpdateLogLevelResponse, UpdateRequest, UpdateResponse,
};
use std::{
collections::HashMap,
Expand All @@ -40,6 +42,7 @@ struct SafeNodeRpcService {
running_node: RunningNode,
ctrl_tx: Sender<NodeCtrl>,
started_instant: Instant,
log_reload_handle: ReloadHandle,
}

// Implementing RPC interface for service defined in .proto
Expand Down Expand Up @@ -347,6 +350,28 @@ impl SafeNode for SafeNodeRpcService {
)),
}
}

async fn update_log_level(
&self,
request: Request<UpdateLogLevelRequest>,
) -> Result<Response<UpdateLogLevelResponse>, Status> {
debug!(
"RPC request received at {}: {:?}",
self.addr,
request.get_ref()
);

match self
.log_reload_handle
.modify_log_level(&request.get_ref().log_level)
{
Ok(()) => Ok(Response::new(UpdateLogLevelResponse {})),
Err(err) => Err(Status::new(
Code::Internal,
format!("Failed to update node's log level: {err:?}"),
)),
}
}
}

pub(crate) fn start_rpc_service(
Expand All @@ -355,6 +380,7 @@ pub(crate) fn start_rpc_service(
running_node: RunningNode,
ctrl_tx: Sender<NodeCtrl>,
started_instant: Instant,
log_reload_handle: ReloadHandle,
) {
// creating a service
let service = SafeNodeRpcService {
Expand All @@ -363,6 +389,7 @@ pub(crate) fn start_rpc_service(
running_node,
ctrl_tx,
started_instant,
log_reload_handle,
};
info!("RPC Server listening on {addr}");
println!("RPC Server listening on {addr}");
Expand Down
1 change: 1 addition & 0 deletions sn_node_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ mod tests {
async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> ServiceControlResult<()>;
async fn node_stop(&self, delay_millis: u64) -> ServiceControlResult<()>;
async fn node_update(&self, delay_millis: u64) -> ServiceControlResult<()>;
async fn update_log_level(&self, log_levels: String) -> ServiceControlResult<()>;
}
}

Expand Down
1 change: 1 addition & 0 deletions sn_node_manager/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ mod tests {
async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> RpcResult<()>;
async fn node_stop(&self, delay_millis: u64) -> RpcResult<()>;
async fn node_update(&self, delay_millis: u64) -> RpcResult<()>;
async fn update_log_level(&self, log_levels: String) -> RpcResult<()>;
}
}

Expand Down
19 changes: 19 additions & 0 deletions sn_node_rpc_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ enum Cmd {
#[clap(default_value = "0")]
delay_millis: u64,
},
/// Update the node's log levels.
#[clap(name = "log")]
Log {
/// Change the log level of the safenode.
///
/// Example: --level SN_LOG=all,RUST_LOG=libp2p=debug
#[clap(name = "level", long)]
log_level: String,
},
}

#[tokio::main]
Expand Down Expand Up @@ -154,6 +163,7 @@ async fn main() -> Result<()> {
} => node_restart(addr, delay_millis, retain_peer_id).await,
Cmd::Stop { delay_millis } => node_stop(addr, delay_millis).await,
Cmd::Update { delay_millis } => node_update(addr, delay_millis).await,
Cmd::Log { log_level } => update_log_level(addr, log_level).await,
}
}

Expand Down Expand Up @@ -395,3 +405,12 @@ pub async fn node_update(addr: SocketAddr, delay_millis: u64) -> Result<()> {
);
Ok(())
}

pub async fn update_log_level(addr: SocketAddr, log_levels: String) -> Result<()> {
let endpoint = format!("https://{addr}");
let client = RpcClient::new(&endpoint);

client.update_log_level(log_levels.clone()).await?;
println!("Node successfully received the request to update the log level to {log_levels:?}",);
Ok(())
}
6 changes: 6 additions & 0 deletions sn_protocol/src/safenode_proto/req_resp_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,9 @@ message UpdateRequest {

message UpdateResponse {}

// Set the node's log level
message UpdateLogLevelRequest {
string log_level = 1;
}

message UpdateLogLevelResponse{}
3 changes: 3 additions & 0 deletions sn_protocol/src/safenode_proto/safenode.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,7 @@ service SafeNode {

// Update the node
rpc Update (UpdateRequest) returns (UpdateResponse);

// Update the log level of the node
rpc UpdateLogLevel (UpdateLogLevelRequest) returns (UpdateLogLevelResponse);
}
17 changes: 16 additions & 1 deletion sn_service_management/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use libp2p::{kad::RecordKey, Multiaddr, PeerId};
use sn_protocol::safenode_proto::{
safe_node_client::SafeNodeClient, GossipsubPublishRequest, GossipsubSubscribeRequest,
GossipsubUnsubscribeRequest, NetworkInfoRequest, NodeInfoRequest, RecordAddressesRequest,
RestartRequest, StopRequest, UpdateRequest,
RestartRequest, StopRequest, UpdateLogLevelRequest, UpdateRequest,
};
use std::{net::SocketAddr, path::PathBuf, str::FromStr};
use tokio::time::Duration;
Expand Down Expand Up @@ -51,6 +51,7 @@ pub trait RpcActions: Sync {
async fn node_restart(&self, delay_millis: u64, retain_peer_id: bool) -> Result<()>;
async fn node_stop(&self, delay_millis: u64) -> Result<()>;
async fn node_update(&self, delay_millis: u64) -> Result<()>;
async fn update_log_level(&self, log_levels: String) -> Result<()>;
}

pub struct RpcClient {
Expand Down Expand Up @@ -245,4 +246,18 @@ impl RpcActions for RpcClient {
})?;
Ok(())
}

async fn update_log_level(&self, log_levels: String) -> Result<()> {
let mut client = self.connect_with_retry().await?;
let _response = client
.update_log_level(Request::new(UpdateLogLevelRequest {
log_level: log_levels,
}))
.await
.map_err(|e| {
error!("Could not update node through RPC: {e:?}");
Error::RpcNodeUpdateError(e.to_string())
})?;
Ok(())
}
}

0 comments on commit c28ea66

Please sign in to comment.