Skip to content

Commit

Permalink
Merge pull request #39
Browse files Browse the repository at this point in the history
Improve worker load distribution
  • Loading branch information
int08h authored Jun 13, 2024
2 parents ecb9716 + 40f6049 commit daa23fd
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 46 deletions.
19 changes: 10 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "roughenough"
version = "1.2.1-draft8"
version = "1.3.0-draft8"
repository = "https://github.com/int08h/roughenough"
authors = ["Stuart Stock <[email protected]>", "Aaron Hill <[email protected]>"]
license = "Apache-2.0"
Expand All @@ -20,20 +20,21 @@ gcpkms = ["google-cloudkms1", "hyper", "hyper-rustls", "serde", "serde_json", "y
[dependencies]
byteorder = "1"
chrono = "0.4"
clap = "2"
ctrlc = { version = "3.2", features = ["termination"] }
humansize = "1"
clap = "4"
ctrlc = { version = "3.4", features = ["termination"] }
humansize = "2"
log = "0.4"
mio = "0.6"
mio-extras = "2.0"
net2 = "0.2"
once_cell = "1.19"
rand = "0.6"
ring = "0.16"
simple_logger = "1"
ring = "0.17"
simple_logger = "5"
yaml-rust = "0.4"
zeroize = "1.4"
data-encoding = "2.3"
enum-iterator = "2.0"
zeroize = "1.8"
data-encoding = "2.6"
enum-iterator = "2.1"

# Used by 'awskms' and 'gcpkms'
futures = { version = "^0.3", optional = true }
Expand Down
50 changes: 31 additions & 19 deletions src/bin/roughenough-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
#[macro_use]
extern crate log;

use std::{env, io, thread};
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::{env, thread};
use std::sync::atomic::{AtomicBool, Ordering};

use log::LevelFilter;
use mio::net::UdpSocket;
use mio::Events;
use mio::net::UdpSocket;
use net2::UdpBuilder;
use net2::unix::UnixUdpBuilderExt;
use once_cell::sync::Lazy;
use simple_logger::SimpleLogger;

Expand All @@ -43,7 +45,7 @@ use roughenough::server::Server;
// the Ctrl-C (SIGINT) handler created in `set_ctrlc_handler()`
static KEEP_RUNNING: Lazy<AtomicBool> = Lazy::new(|| AtomicBool::new(true));

fn polling_loop(cfg: Arc<Mutex<Box<dyn ServerConfig>>>, socket: Arc<UdpSocket>) {
fn polling_loop(cfg: Arc<Mutex<Box<dyn ServerConfig>>>, socket: UdpSocket) {
let mut server = {
let config = cfg.lock().unwrap();
let server = Server::new(config.as_ref(), socket);
Expand All @@ -52,7 +54,7 @@ fn polling_loop(cfg: Arc<Mutex<Box<dyn ServerConfig>>>, socket: Arc<UdpSocket>)
server
};

let mut events = Events::with_capacity(2048);
let mut events = Events::with_capacity(1024);

loop {
server.process_events(&mut events);
Expand All @@ -69,6 +71,24 @@ fn set_ctrlc_handler() {
.expect("failed setting Ctrl-C handler");
}

// Bind to the server port using SO_REUSEPORT and SO_REUSEADDR so the kernel will more fairly
// balance traffic to each worker. https://lwn.net/Articles/542629/
fn bind_socket(config: Arc<Mutex<Box<dyn ServerConfig>>>) -> io::Result<UdpSocket> {
let sock_addr = config
.lock()
.unwrap()
.udp_socket_addr()
.expect("udp sock addr");

let std_socket = UdpBuilder::new_v4()?
.reuse_address(true)?
.reuse_port(true)?
.bind(sock_addr)?;

let mio_socket: UdpSocket = UdpSocket::from_socket(std_socket)?;
Ok(mio_socket)
}

fn display_config(server: &Server, cfg: &dyn ServerConfig) {
info!("Processing thread : {}", server.thread_name());
info!("Number of workers : {}", cfg.num_workers());
Expand Down Expand Up @@ -133,27 +153,19 @@ pub fn main() {
Ok(cfg) => Arc::new(Mutex::new(cfg)),
};

let socket = {
let sock_addr = config
.lock()
.unwrap()
.udp_socket_addr()
.expect("udp sock addr");
let sock = UdpSocket::bind(&sock_addr).expect("failed to bind to socket");
Arc::new(sock)
};

set_ctrlc_handler();

// TODO(stuart) move TCP healthcheck out of worker threads as it currently conflicts
// TODO(stuart) TCP healthcheck REUSEADDR and RESUSEPORT on the tcp socket

let mut threads = Vec::new();

for i in 0..config.lock().unwrap().num_workers() {
let num_workers = config.lock().unwrap().num_workers();
for i in 0..num_workers {
let cfg = config.clone();
let sock = socket.try_clone().unwrap();
let socket = bind_socket(cfg.clone()).unwrap();
let thread = thread::Builder::new()
.name(format!("worker-{}", i))
.spawn(move || polling_loop(cfg, sock.into()))
.spawn(move || polling_loop(cfg.clone(), socket) )
.expect("failure spawning thread");

threads.push(thread);
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2021 int08h LLC
// Copyright 2017-2024 int08h LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,8 @@
//! An implementation of the [Roughtime](https://roughtime.googlesource.com/roughtime)
//! secure time synchronization protocol.
//!
//! Roughtime aims to achieve rough time synchronisation in a secure way that doesn't
//! depend on any particular time server, and in such a way that, if a time server does
//! Roughtime aims to achieve rough time synchronization in a secure way that doesn't
//! depend on any particular timeserver, and in such a way that, if a timeserver does
//! misbehave, clients end up with cryptographic proof of it.
//!
//! # Protocol
Expand Down Expand Up @@ -79,7 +79,7 @@ pub mod stats;
pub mod version;

/// Version of Roughenough
pub const VERSION: &str = "1.2.1-draft8";
pub const VERSION: &str = "1.3.0-draft8";

/// Roughenough version string enriched with any compile-time optional features
pub fn roughenough_version() -> String {
Expand Down
8 changes: 4 additions & 4 deletions src/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl MerkleTree {
}

pub fn get_paths(&self, mut index: usize) -> Vec<u8> {
let mut paths = Vec::with_capacity(self.levels.len() * self.algorithm.output_len);
let mut paths = Vec::with_capacity(self.levels.len() * self.algorithm.output_len());
let mut level = 0;

while !self.levels[level].is_empty() {
Expand Down Expand Up @@ -95,7 +95,7 @@ impl MerkleTree {
}

if node_count % 2 != 0 {
self.levels[level - 1].push(vec![0; self.algorithm.output_len]);
self.levels[level - 1].push(vec![0; self.algorithm.output_len()]);
node_count += 1;
}

Expand Down Expand Up @@ -146,9 +146,9 @@ impl MerkleTree {
Hash::from(ctx.finish().as_ref())
};

assert_eq!(paths.len() % self.algorithm.output_len, 0);
assert_eq!(paths.len() % self.algorithm.output_len(), 0);

for path in paths.chunks(self.algorithm.output_len) {
for path in paths.chunks(self.algorithm.output_len()) {
let mut ctx = digest::Context::new(self.algorithm);
ctx.update(TREE_NODE_TWEAK);

Expand Down
18 changes: 8 additions & 10 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
use std::io::ErrorKind;
use std::io::Write;
use std::net::{IpAddr, Shutdown, SocketAddr};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use humansize::{file_size_opts as fsopts, FileSize};
use humansize::{BINARY, format_size};
use mio::net::{TcpListener, UdpSocket};
use mio::{Events, Poll, PollOpt, Ready, Token};
use mio_extras::timer::Timer;
Expand Down Expand Up @@ -56,7 +55,7 @@ const HTTP_RESPONSE: &str = "HTTP/1.1 200 OK\nContent-Length: 0\nConnection: clo
///
pub struct Server {
batch_size: u8,
socket: Arc<UdpSocket>,
socket: UdpSocket,
health_listener: Option<TcpListener>,
poll_duration: Option<Duration>,
status_interval: Duration,
Expand All @@ -80,7 +79,7 @@ impl Server {
/// Create a new server instance from the provided
/// [`ServerConfig`](../config/trait.ServerConfig.html) trait object instance.
///
pub fn new(config: &dyn ServerConfig, socket: Arc<UdpSocket>) -> Server {
pub fn new(config: &dyn ServerConfig, socket: UdpSocket) -> Server {
let mut timer: Timer<()> = Timer::default();
timer.set_timeout(config.status_interval(), ());

Expand Down Expand Up @@ -186,13 +185,12 @@ impl Server {

let socket_now_empty = self.collect_requests();

let sock_copy = Arc::get_mut(&mut self.socket).unwrap();
self.responder_rfc
.send_responses(sock_copy, &mut self.stats);
.send_responses(&mut self.socket, &mut self.stats);
self.responder_draft
.send_responses(sock_copy, &mut self.stats);
.send_responses(&mut self.socket, &mut self.stats);
self.responder_classic
.send_responses(sock_copy, &mut self.stats);
.send_responses(&mut self.socket, &mut self.stats);

if socket_now_empty {
break;
Expand Down Expand Up @@ -295,7 +293,7 @@ impl Server {
counts.invalid_requests,
counts.classic_responses_sent,
counts.rfc_responses_sent,
counts.bytes_sent.file_size(fsopts::BINARY).unwrap(),
format_size(counts.bytes_sent, BINARY),
counts.failed_send_attempts,
counts.retried_send_attempts
);
Expand All @@ -312,7 +310,7 @@ impl Server {
self.stats.total_responses_sent(),
self.stats.num_classic_responses_sent(),
self.stats.num_rfc_responses_sent(),
self.stats.total_bytes_sent().file_size(fsopts::BINARY).unwrap(),
format_size(self.stats.total_bytes_sent(), BINARY),
self.stats.total_failed_send_attempts(),
self.stats.total_retried_send_attempts()
);
Expand Down

0 comments on commit daa23fd

Please sign in to comment.