Skip to content

Commit

Permalink
feat(bin): don't allocate in UDP recv path (#2189)
Browse files Browse the repository at this point in the history
Pass a long lived receive buffer to `neqo_udp::recv_inner`, receiving an
iterator of `Datagram<&[u8]>`s pointing into that buffer, thus no longer
allocating in UDP receive path.
  • Loading branch information
mxinden committed Oct 21, 2024
1 parent 78c54c7 commit 36b352f
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 48 deletions.
9 changes: 5 additions & 4 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,11 @@ impl super::Client for Connection {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = Datagram<&'a [u8]>>,
{
fn process_multiple_input<'a>(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
now: Instant,
) {
self.process_multiple_input(dgrams, now);
}

Expand Down
9 changes: 5 additions & 4 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ impl super::Client for Http3Client {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = Datagram<&'a [u8]>>,
{
fn process_multiple_input<'a>(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
now: Instant,
) {
self.process_multiple_input(dgrams, now);
}

Expand Down
62 changes: 35 additions & 27 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,11 @@ enum CloseState {
/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
trait Client {
fn process_output(&mut self, now: Instant) -> Output;
fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = Datagram<&'a [u8]>>;
fn process_multiple_input<'a>(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
now: Instant,
);
fn has_events(&self) -> bool;
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
Expand All @@ -392,9 +394,28 @@ struct Runner<'a, H: Handler> {
handler: H,
timeout: Option<Pin<Box<Sleep>>>,
args: &'a Args,
recv_buf: Vec<u8>,
}

impl<'a, H: Handler> Runner<'a, H> {
fn new(
local_addr: SocketAddr,
socket: &'a mut crate::udp::Socket,
client: H::Client,
handler: H,
args: &'a Args,
) -> Self {
Self {
local_addr,
socket,
client,
handler,
args,
timeout: None,
recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE],
}
}

async fn run(mut self) -> Res<Option<ResumptionToken>> {
loop {
let handler_done = self.handler.handle(&mut self.client)?;
Expand Down Expand Up @@ -457,12 +478,13 @@ impl<'a, H: Handler> Runner<'a, H> {

async fn process_multiple_input(&mut self) -> Res<()> {
loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
let Some(dgrams) = self.socket.recv(&self.local_addr, &mut self.recv_buf)? else {
break;
};
if dgrams.len() == 0 {
break;
}
self.client
.process_multiple_input(dgrams.iter().map(Datagram::borrow), Instant::now());
self.client.process_multiple_input(dgrams, Instant::now());
self.process_output().await?;
}

Expand Down Expand Up @@ -573,32 +595,18 @@ pub async fn client(mut args: Args) -> Res<()> {

let handler = http09::Handler::new(to_request, &args);

Runner {
args: &args,
client,
handler,
local_addr: real_local,
socket: &mut socket,
timeout: None,
}
.run()
.await?
Runner::new(real_local, &mut socket, client, handler, &args)
.run()
.await?
} else {
let client = http3::create_client(&args, real_local, remote_addr, &hostname, token)
.expect("failed to create client");

let handler = http3::Handler::new(to_request, &args);

Runner {
args: &args,
client,
handler,
local_addr: real_local,
socket: &mut socket,
timeout: None,
}
.run()
.await?
Runner::new(real_local, &mut socket, client, handler, &args)
.run()
.await?
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion neqo-bin/src/server/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl HttpServer {
}

impl super::HttpServer for HttpServer {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output {
self.server.process(dgram, now)
}

Expand Down
2 changes: 1 addition & 1 deletion neqo-bin/src/server/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Display for HttpServer {
}

impl super::HttpServer for HttpServer {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> neqo_http3::Output {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> neqo_http3::Output {
self.server.process(dgram, now)
}

Expand Down
15 changes: 10 additions & 5 deletions neqo-bin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ fn qns_read_response(filename: &str) -> Result<Vec<u8>, io::Error> {

#[allow(clippy::module_name_repetitions)]
pub trait HttpServer: Display {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output;
fn process_events(&mut self, now: Instant);
fn has_events(&self) -> bool;
}
Expand All @@ -205,6 +205,7 @@ pub struct ServerRunner {
server: Box<dyn HttpServer>,
timeout: Option<Pin<Box<Sleep>>>,
sockets: Vec<(SocketAddr, crate::udp::Socket)>,
recv_buf: Vec<u8>,
}

impl ServerRunner {
Expand All @@ -219,6 +220,7 @@ impl ServerRunner {
server,
timeout: None,
sockets,
recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE],
}
}

Expand All @@ -236,7 +238,7 @@ impl ServerRunner {
.unwrap_or(first_socket)
}

async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
async fn process(&mut self, mut dgram: Option<Datagram>) -> Result<(), io::Error> {
loop {
match self.server.process(dgram.take(), (self.now)()) {
Output::Datagram(dgram) => {
Expand Down Expand Up @@ -289,12 +291,15 @@ impl ServerRunner {
match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgrams = socket.recv(host)?;
if dgrams.is_empty() {
let Some(dgrams) = socket.recv(host, &mut self.recv_buf)? else {
break;
};
if dgrams.len() == 0 {
break;
}
let dgrams: Vec<Datagram> = dgrams.map(|d| d.to_owned()).collect();
for dgram in dgrams {
self.process(Some(&dgram)).await?;
self.process(Some(dgram)).await?;
}
},
Ready::Timeout => {
Expand Down
15 changes: 9 additions & 6 deletions neqo-bin/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use std::{io, net::SocketAddr};

use neqo_common::Datagram;
use neqo_transport::RECV_BUFFER_SIZE;
use neqo_udp::DatagramIter;

/// Ideally this would live in [`neqo-udp`]. [`neqo-udp`] is used in Firefox.
///
Expand Down Expand Up @@ -56,16 +56,19 @@ impl Socket {

/// Receive a batch of [`Datagram`]s on the given [`Socket`], each set with
/// the provided local address.
pub fn recv(&self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
let mut recv_buf = vec![0; RECV_BUFFER_SIZE];
pub fn recv<'a>(
&self,
local_address: &SocketAddr,
recv_buf: &'a mut [u8],
) -> Result<Option<DatagramIter<'a>>, io::Error> {
self.inner
.try_io(tokio::io::Interest::READABLE, || {
neqo_udp::recv_inner(local_address, &self.state, &self.inner, &mut recv_buf)
neqo_udp::recv_inner(local_address, &self.state, &self.inner, recv_buf)
})
.map(|dgrams| dgrams.map(|d| d.to_owned()).collect())
.map(Some)
.or_else(|e| {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(vec![])
Ok(None)
} else {
Err(e)
}
Expand Down

0 comments on commit 36b352f

Please sign in to comment.