Skip to content

Commit

Permalink
chore: improvements from dev feedback
Browse files Browse the repository at this point in the history
Reduce the start up time for the local network by removing the wait in the `get_available_port`
function. This function is now only being used to select ports for the RPC service, so it shouldn't
be subject to the same problems we had with the node when it switched over to Quic.

Also, the `--clean` argument on the run command has been extended to kill any existing local
network.
  • Loading branch information
jacderida authored and joshuef committed Feb 8, 2024
1 parent dd6be44 commit aadc8b4
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 48 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sn_node_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ service-manager = "0.5.1"
sn_node_rpc_client = { path = "../sn_node_rpc_client", version = "0.4.35" }
sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.2.4" }
sn_protocol = { path = "../sn_protocol", version = "0.12.4" }
sn-releases = "0.1.6"
sn-releases = "0.1.7"
sysinfo = "0.29.10"
tokio = { version = "1.26", features = ["full"] }
uuid = { version = "1.5.0", features = ["v4"] }
Expand Down
9 changes: 6 additions & 3 deletions sn_node_manager/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub enum UpgradeResult {
Error(String),
}

// macOS seems to require this delay to be in seconds rather than milliseconds.
const RPC_START_UP_DELAY_MS: u64 = 3000;

pub async fn start(
node: &mut Node,
service_control: &dyn ServiceControl,
Expand All @@ -47,7 +50,7 @@ pub async fn start(
service_control.start(&node.service_name)?;

// Give the node a little bit of time to start before initiating the node info query.
service_control.wait(3);
service_control.wait(RPC_START_UP_DELAY_MS);
let node_info = rpc_client.node_info().await?;
let network_info = rpc_client.network_info().await?;
node.listen_addr = Some(
Expand Down Expand Up @@ -384,7 +387,7 @@ mod tests {
.returning(|_| Ok(()));
mock_service_control
.expect_wait()
.with(eq(3))
.with(eq(3000))
.times(1)
.returning(|_| ());
mock_rpc_client.expect_node_info().times(1).returning(|| {
Expand Down Expand Up @@ -457,7 +460,7 @@ mod tests {
.returning(|_| Ok(()));
mock_service_control
.expect_wait()
.with(eq(3))
.with(eq(3000))
.times(1)
.returning(|_| ());
mock_rpc_client.expect_node_info().times(1).returning(|| {
Expand Down
12 changes: 9 additions & 3 deletions sn_node_manager/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Launcher for LocalSafeLauncher {
///
/// This is wrapped mainly just for unit testing.
fn wait(&self, delay: u64) {
std::thread::sleep(std::time::Duration::from_secs(delay));
std::thread::sleep(std::time::Duration::from_millis(delay));
}
}

Expand Down Expand Up @@ -180,6 +180,7 @@ pub fn kill_network(node_registry: &NodeRegistry, keep_directories: bool) -> Res
pub struct LocalNetworkOptions {
pub faucet_bin_path: PathBuf,
pub join: bool,
pub interval: u64,
pub node_count: u16,
pub peers: Option<Vec<Multiaddr>>,
pub safenode_bin_path: PathBuf,
Expand Down Expand Up @@ -233,6 +234,7 @@ pub async fn run_network(
let node = run_node(
number,
true,
network_options.interval,
rpc_socket_addr,
vec![],
&launcher,
Expand All @@ -242,6 +244,7 @@ pub async fn run_network(
node_registry.nodes.push(node.clone());
(node.listen_addr.unwrap(), 2)
};
node_registry.save()?;

for _ in start..=network_options.node_count {
let rpc_port = service_control.get_available_port()?;
Expand All @@ -252,6 +255,7 @@ pub async fn run_network(
let node = run_node(
number,
false,
network_options.interval,
rpc_socket_addr,
bootstrap_peers.clone(),
&launcher,
Expand Down Expand Up @@ -285,6 +289,7 @@ pub async fn run_network(
pub async fn run_node(
number: u16,
genesis: bool,
interval: u64,
rpc_socket_addr: SocketAddr,
bootstrap_peers: Vec<Multiaddr>,
launcher: &dyn Launcher,
Expand All @@ -294,7 +299,7 @@ pub async fn run_node(

println!("Launching node {number}...");
launcher.launch_node(rpc_socket_addr, bootstrap_peers.clone())?;
launcher.wait(2);
launcher.wait(interval);

let node_info = rpc_client.node_info().await?;
let peer_id = node_info.peer_id;
Expand Down Expand Up @@ -427,7 +432,7 @@ mod tests {
.returning(|_, _| Ok(()));
mock_launcher
.expect_wait()
.with(eq(2))
.with(eq(100))
.times(1)
.returning(|_| ());
mock_launcher
Expand Down Expand Up @@ -461,6 +466,7 @@ mod tests {
let node = run_node(
1,
true,
100,
rpc_socket_addr,
vec![],
&mock_launcher,
Expand Down
77 changes: 47 additions & 30 deletions sn_node_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ pub enum SubCmd {
#[clap(long)]
version: Option<String>,
},
/// Add one or more new safenode services.
///
/// This command must run as the root/administrative user.
/// Run a faucet server for use with a local network.
#[clap(name = "faucet")]
Faucet {
/// Set to build the safenode and faucet binaries.
Expand Down Expand Up @@ -197,6 +195,11 @@ pub enum SubCmd {
/// The version and path arguments are mutually exclusive.
#[clap(long)]
faucet_version: Option<String>,
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 200)]
interval: u64,
/// Path to a safenode binary
///
/// The path and version arguments are mutually exclusive.
Expand Down Expand Up @@ -244,9 +247,7 @@ pub enum SubCmd {
/// This assumes the command is being run from the root of the safe_network repository.
#[clap(long)]
build: bool,
/// Set to remove the client data directory.
///
/// This can be useful for clearing wallets from previous local networks.
/// Set to remove the client data directory and kill any existing local network.
#[clap(long)]
clean: bool,
/// The number of nodes to run.
Expand All @@ -262,6 +263,11 @@ pub enum SubCmd {
/// The version and path arguments are mutually exclusive.
#[clap(long, conflicts_with = "build")]
faucet_version: Option<String>,
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 200)]
interval: u64,
/// Path to a safenode binary
///
/// The path and version arguments are mutually exclusive.
Expand Down Expand Up @@ -440,6 +446,7 @@ async fn main() -> Result<()> {
count,
faucet_path,
faucet_version,
interval,
node_path,
node_version,
peers,
Expand Down Expand Up @@ -481,6 +488,7 @@ async fn main() -> Result<()> {
};
let options = LocalNetworkOptions {
faucet_bin_path: faucet_path,
interval,
join: true,
node_count: count,
peers,
Expand All @@ -490,22 +498,7 @@ async fn main() -> Result<()> {
run_network(&mut local_node_registry, &NodeServiceManager {}, options).await?;
Ok(())
}
SubCmd::Kill { keep_directories } => {
let local_reg_path = &get_local_node_registry_path()?;
let local_node_registry = NodeRegistry::load(local_reg_path)?;
if local_node_registry.nodes.is_empty() {
println!("No local network is currently running");
} else {
if verbosity != VerbosityLevel::Minimal {
println!("=================================================");
println!(" Killing Local Network ");
println!("=================================================");
}
kill_network(&local_node_registry, keep_directories)?;
std::fs::remove_file(local_reg_path)?;
}
Ok(())
}
SubCmd::Kill { keep_directories } => kill_local_network(verbosity, keep_directories),
SubCmd::Remove {
peer_id,
service_name,
Expand Down Expand Up @@ -555,26 +548,32 @@ async fn main() -> Result<()> {
count,
faucet_path,
faucet_version,
interval,
node_path,
node_version,
skip_validation: _,
} => {
// In the clean case, the node registry must be loaded *after* the existing network has
// been killed, which clears it out.
let local_node_reg_path = &get_local_node_registry_path()?;
let mut local_node_registry = NodeRegistry::load(local_node_reg_path)?;
if !local_node_registry.nodes.is_empty() {
return Err(eyre!("A local network is already running")
.suggestion("Use the kill command to destroy the network then try again"));
}

if clean {
let mut local_node_registry = if clean {
let client_data_path = dirs_next::data_dir()
.ok_or_else(|| eyre!("Could not obtain user's data directory"))?
.join("safe")
.join("client");
if client_data_path.is_dir() {
std::fs::remove_dir_all(client_data_path)?;
}
}
kill_local_network(verbosity.clone(), false)?;
NodeRegistry::load(local_node_reg_path)?
} else {
let local_node_registry = NodeRegistry::load(local_node_reg_path)?;
if !local_node_registry.nodes.is_empty() {
return Err(eyre!("A local network is already running")
.suggestion("Use the kill command to destroy the network then try again"));
}
local_node_registry
};

if verbosity != VerbosityLevel::Minimal {
println!("=================================================");
Expand Down Expand Up @@ -603,6 +602,7 @@ async fn main() -> Result<()> {
let options = LocalNetworkOptions {
faucet_bin_path: faucet_path,
join: false,
interval,
node_count: count,
peers: None,
safenode_bin_path: node_path,
Expand Down Expand Up @@ -1007,3 +1007,20 @@ fn build_binary(bin_type: &ReleaseType) -> Result<()> {

Ok(())
}

fn kill_local_network(verbosity: VerbosityLevel, keep_directories: bool) -> Result<()> {
let local_reg_path = &get_local_node_registry_path()?;
let local_node_registry = NodeRegistry::load(local_reg_path)?;
if local_node_registry.nodes.is_empty() {
println!("No local network is currently running");
} else {
if verbosity != VerbosityLevel::Minimal {
println!("=================================================");
println!(" Killing Local Network ");
println!("=================================================");
}
kill_network(&local_node_registry, keep_directories)?;
std::fs::remove_file(local_reg_path)?;
}
Ok(())
}
10 changes: 1 addition & 9 deletions sn_node_manager/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@ use std::{
ffi::OsString,
net::{SocketAddr, TcpListener},
path::PathBuf,
thread::sleep,
time::Duration,
};
use sysinfo::{Pid, System, SystemExt};

// The UDP port might fail to unbind even when dropped and this can cause the safenode process to throw errors.
const PORT_UNBINDING_DELAY: Duration = Duration::from_secs(3);

#[derive(Debug, PartialEq)]
pub struct ServiceConfig {
pub data_dir_path: PathBuf,
Expand Down Expand Up @@ -163,9 +158,6 @@ impl ServiceControl for NodeServiceManager {
let socket = TcpListener::bind(addr)?;
let port = socket.local_addr()?.port();
drop(socket);
// Sleep a little while to make sure that we've dropped the socket.
// Without the delay, we may face 'Port already in use' error, when trying to re-use this port.
sleep(PORT_UNBINDING_DELAY);

Ok(port)
}
Expand Down Expand Up @@ -278,6 +270,6 @@ impl ServiceControl for NodeServiceManager {
///
/// This is wrapped mainly just for unit testing.
fn wait(&self, delay: u64) {
std::thread::sleep(std::time::Duration::from_secs(delay));
std::thread::sleep(std::time::Duration::from_millis(delay));
}
}

0 comments on commit aadc8b4

Please sign in to comment.