Skip to content

Commit

Permalink
Bind server port using REUSEADDR and REUSEPORT.
Browse files Browse the repository at this point in the history
Bind server socket with SO_REUSE{ADDR,PORT} socket options to more fairly distribute load to worker threads.
  • Loading branch information
int08h committed Jun 13, 2024
1 parent c675201 commit f462b48
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
58 changes: 29 additions & 29 deletions src/bin/roughenough-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,16 @@
#[macro_use]
extern crate log;

use std::{env, thread};
use std::net::UdpSocket as StdUdpSocket;
use std::os::fd::{AsRawFd, FromRawFd};
use std::{env, io, thread};
use std::process;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};

use log::LevelFilter;
use mio::Events;
use mio::net::UdpSocket;
use nix::sys::socket::{setsockopt, sockopt::ReusePort};
use nix::sys::socket::sockopt::ReuseAddr;
use net2::UdpBuilder;
use net2::unix::UnixUdpBuilderExt;
use once_cell::sync::Lazy;
use simple_logger::SimpleLogger;

Expand All @@ -47,8 +45,7 @@ use roughenough::server::Server;
// the Ctrl-C (SIGINT) handler created in `set_ctrlc_handler()`
static KEEP_RUNNING: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(true));

fn polling_loop(cfg: Arc<Mutex<Box<dyn ServerConfig>>>) {
let socket = bind_socket(cfg.clone());
fn polling_loop(cfg: Arc<Mutex<Box<dyn ServerConfig>>>, socket: UdpSocket) {
let mut server = {
let config = cfg.lock().unwrap();
let server = Server::new(config.as_ref(), socket);
Expand All @@ -57,7 +54,7 @@ fn polling_loop(cfg: Arc<Mutex<Box<dyn ServerConfig>>>) {
server
};

let mut events = Events::with_capacity(2048);
let mut events = Events::with_capacity(1024);

loop {
server.process_events(&mut events);
Expand All @@ -74,6 +71,24 @@ fn set_ctrlc_handler() {
.expect("failed setting Ctrl-C handler");
}

// Bind to the server port using SO_REUSEPORT and SO_REUSEADDR so the kernel will more fairly
// balance traffic to each worker. https://lwn.net/Articles/542629/
fn bind_socket(config: Arc<Mutex<Box<dyn ServerConfig>>>) -> io::Result<UdpSocket> {
let sock_addr = config
.lock()
.unwrap()
.udp_socket_addr()
.expect("udp sock addr");

let std_socket = UdpBuilder::new_v4()?
.reuse_address(true)?
.reuse_port(true)?
.bind(sock_addr)?;

let mio_socket: UdpSocket = UdpSocket::from_socket(std_socket)?;
Ok(mio_socket)
}

fn display_config(server: &Server, cfg: &dyn ServerConfig) {
info!("Processing thread : {}", server.thread_name());
info!("Number of workers : {}", cfg.num_workers());
Expand Down Expand Up @@ -113,24 +128,6 @@ fn display_config(server: &Server, cfg: &dyn ServerConfig) {
}
}

fn bind_socket(config: Arc<Mutex<Box<dyn ServerConfig>>>) -> UdpSocket {
let sock_addr = config
.lock()
.unwrap()
.udp_socket_addr()
.expect("udp sock addr");

let socket = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");

unsafe {
let std_sock = StdUdpSocket::from_raw_fd(socket.as_raw_fd());
setsockopt(&std_sock, ReusePort, &true).expect("setting SO_REUSEPORT");
setsockopt(&std_sock, ReuseAddr, &true).expect("setting SO_REUSEADDR");
}

socket
}

pub fn main() {
SimpleLogger::new()
.with_level(LevelFilter::Info)
Expand Down Expand Up @@ -158,14 +155,17 @@ pub fn main() {

set_ctrlc_handler();

// TODO(stuart) move TCP healthcheck out of worker threads as it currently conflicts
// TODO(stuart) TCP healthcheck REUSEADDR and RESUSEPORT on the tcp socket

let mut threads = Vec::new();

for i in 0..config.lock().unwrap().num_workers() {
let num_workers = config.lock().unwrap().num_workers();
for i in 0..num_workers {
let cfg = config.clone();
let socket = bind_socket(cfg.clone()).unwrap();
let thread = thread::Builder::new()
.name(format!("worker-{}", i))
.spawn(move || polling_loop(cfg) )
.spawn(move || polling_loop(cfg.clone(), socket) )
.expect("failure spawning thread");

threads.push(thread);
Expand Down
6 changes: 3 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::net::{IpAddr, Shutdown, SocketAddr};
use std::thread;
use std::time::Duration;

use humansize::{file_size_opts as fsopts, FileSize};
use humansize::{BINARY, format_size};
use mio::net::{TcpListener, UdpSocket};
use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_extras::timer::Timer;
Expand Down Expand Up @@ -293,7 +293,7 @@ impl Server {
counts.invalid_requests,
counts.classic_responses_sent,
counts.rfc_responses_sent,
counts.bytes_sent.file_size(fsopts::BINARY).unwrap(),
format_size(counts.bytes_sent, BINARY),
counts.failed_send_attempts,
counts.retried_send_attempts
);
Expand All @@ -310,7 +310,7 @@ impl Server {
self.stats.total_responses_sent(),
self.stats.num_classic_responses_sent(),
self.stats.num_rfc_responses_sent(),
self.stats.total_bytes_sent().file_size(fsopts::BINARY).unwrap(),
format_size(self.stats.total_bytes_sent(), BINARY),
self.stats.total_failed_send_attempts(),
self.stats.total_retried_send_attempts()
);
Expand Down

0 comments on commit f462b48

Please sign in to comment.