Skip to content

Commit

Permalink
feat(transport): accept borrowed instead of owned input datagram (#2187)
Browse files Browse the repository at this point in the history
* feat(transport): accept borrowed instead of owned input datagram

Previously `process_input` (and the like) took a `Datagram<Vec<u8>>` as input.
In other words, it required allocating each UDP datagram payload in a dedicated
`Vec<u8>` before passing it to `process_input`.

With this patch, `process_input` accepts a `Datagram<&[u8]>`. In other words, it
accepts a `Datagram` with a borrowed view into an existing buffer (`&[u8]`),
e.g. a long lived receive buffer.

* feat(udp): return borrowed Datagram on receive

Previously `recv_inner` would return `Datagram<Vec<u8>>`. In other words, it
would allocate a new `Vec<u8>` for each UDP datagram payload.

Now `recv_inner` reads into a provided buffer and returns `Datagram<&[u8]>`,
i.e. it returns a view into the provided buffer without allocating.

* feat(bin): don't allocate in UDP recv path (#2189)

Pass a long lived receive buffer to `neqo_udp::recv_inner`, receiving an
iterator of `Datagram<&[u8]>`s pointing into that buffer, thus no longer
allocating in UDP receive path.
  • Loading branch information
mxinden authored Oct 22, 2024
1 parent 0dcc9b2 commit 7c23ff6
Show file tree
Hide file tree
Showing 51 changed files with 1,465 additions and 1,409 deletions.
4 changes: 2 additions & 2 deletions fuzz/fuzz_targets/client_initial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fuzz_target!(|data: &[u8]| {
};

let mut client = default_client();
let ci = client.process(None, now()).dgram().expect("a datagram");
let ci = client.process_output(now()).dgram().expect("a datagram");
let Some((header, d_cid, s_cid, payload)) = decode_initial_header(&ci, Role::Client) else {
return;
};
Expand Down Expand Up @@ -60,7 +60,7 @@ fuzz_target!(|data: &[u8]| {
let fuzzed_ci = Datagram::new(ci.source(), ci.destination(), ci.tos(), ciphertext);

let mut server = default_server();
let _response = server.process(Some(&fuzzed_ci), now());
let _response = server.process(Some(fuzzed_ci), now());
});

#[cfg(any(not(fuzzing), windows))]
Expand Down
9 changes: 3 additions & 6 deletions fuzz/fuzz_targets/server_initial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ fuzz_target!(|data: &[u8]| {
};

let mut client = default_client();
let ci = client.process(None, now()).dgram().expect("a datagram");
let ci = client.process_output(now()).dgram().expect("a datagram");
let mut server = default_server();
let si = server
.process(Some(&ci), now())
.dgram()
.expect("a datagram");
let si = server.process(Some(ci), now()).dgram().expect("a datagram");

let Some((header, d_cid, s_cid, payload)) = decode_initial_header(&si, Role::Server) else {
return;
Expand Down Expand Up @@ -64,7 +61,7 @@ fuzz_target!(|data: &[u8]| {
(header_enc.len() - 1)..header_enc.len(),
);
let fuzzed_si = Datagram::new(si.source(), si.destination(), si.tos(), ciphertext);
let _response = client.process(Some(&fuzzed_si), now());
let _response = client.process(Some(fuzzed_si), now());
});

#[cfg(any(not(fuzzing), windows))]
Expand Down
9 changes: 5 additions & 4 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,11 @@ impl super::Client for Connection {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
{
fn process_multiple_input<'a>(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
now: Instant,
) {
self.process_multiple_input(dgrams, now);
}

Expand Down
9 changes: 5 additions & 4 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ impl super::Client for Http3Client {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
{
fn process_multiple_input<'a>(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
now: Instant,
) {
self.process_multiple_input(dgrams, now);
}

Expand Down
62 changes: 35 additions & 27 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,11 @@ enum CloseState {
/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
trait Client {
fn process_output(&mut self, now: Instant) -> Output;
fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>;
fn process_multiple_input<'a>(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
now: Instant,
);
fn has_events(&self) -> bool;
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
Expand All @@ -392,9 +394,28 @@ struct Runner<'a, H: Handler> {
handler: H,
timeout: Option<Pin<Box<Sleep>>>,
args: &'a Args,
recv_buf: Vec<u8>,
}

impl<'a, H: Handler> Runner<'a, H> {
fn new(
local_addr: SocketAddr,
socket: &'a mut crate::udp::Socket,
client: H::Client,
handler: H,
args: &'a Args,
) -> Self {
Self {
local_addr,
socket,
client,
handler,
args,
timeout: None,
recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE],
}
}

async fn run(mut self) -> Res<Option<ResumptionToken>> {
loop {
let handler_done = self.handler.handle(&mut self.client)?;
Expand Down Expand Up @@ -457,12 +478,13 @@ impl<'a, H: Handler> Runner<'a, H> {

async fn process_multiple_input(&mut self) -> Res<()> {
loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
let Some(dgrams) = self.socket.recv(&self.local_addr, &mut self.recv_buf)? else {
break;
};
if dgrams.len() == 0 {
break;
}
self.client
.process_multiple_input(dgrams.iter(), Instant::now());
self.client.process_multiple_input(dgrams, Instant::now());
self.process_output().await?;
}

Expand Down Expand Up @@ -573,32 +595,18 @@ pub async fn client(mut args: Args) -> Res<()> {

let handler = http09::Handler::new(to_request, &args);

Runner {
args: &args,
client,
handler,
local_addr: real_local,
socket: &mut socket,
timeout: None,
}
.run()
.await?
Runner::new(real_local, &mut socket, client, handler, &args)
.run()
.await?
} else {
let client = http3::create_client(&args, real_local, remote_addr, &hostname, token)
.expect("failed to create client");

let handler = http3::Handler::new(to_request, &args);

Runner {
args: &args,
client,
handler,
local_addr: real_local,
socket: &mut socket,
timeout: None,
}
.run()
.await?
Runner::new(real_local, &mut socket, client, handler, &args)
.run()
.await?
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion neqo-bin/src/server/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl HttpServer {
}

impl super::HttpServer for HttpServer {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output {
self.server.process(dgram, now)
}

Expand Down
2 changes: 1 addition & 1 deletion neqo-bin/src/server/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Display for HttpServer {
}

impl super::HttpServer for HttpServer {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> neqo_http3::Output {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> neqo_http3::Output {
self.server.process(dgram, now)
}

Expand Down
15 changes: 10 additions & 5 deletions neqo-bin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ fn qns_read_response(filename: &str) -> Result<Vec<u8>, io::Error> {

#[allow(clippy::module_name_repetitions)]
pub trait HttpServer: Display {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output;
fn process_events(&mut self, now: Instant);
fn has_events(&self) -> bool;
}
Expand All @@ -205,6 +205,7 @@ pub struct ServerRunner {
server: Box<dyn HttpServer>,
timeout: Option<Pin<Box<Sleep>>>,
sockets: Vec<(SocketAddr, crate::udp::Socket)>,
recv_buf: Vec<u8>,
}

impl ServerRunner {
Expand All @@ -219,6 +220,7 @@ impl ServerRunner {
server,
timeout: None,
sockets,
recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE],
}
}

Expand All @@ -236,7 +238,7 @@ impl ServerRunner {
.unwrap_or(first_socket)
}

async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
async fn process(&mut self, mut dgram: Option<Datagram>) -> Result<(), io::Error> {
loop {
match self.server.process(dgram.take(), (self.now)()) {
Output::Datagram(dgram) => {
Expand Down Expand Up @@ -289,12 +291,15 @@ impl ServerRunner {
match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgrams = socket.recv(host)?;
if dgrams.is_empty() {
let Some(dgrams) = socket.recv(host, &mut self.recv_buf)? else {
break;
};
if dgrams.len() == 0 {
break;
}
let dgrams: Vec<Datagram> = dgrams.map(|d| d.to_owned()).collect();
for dgram in dgrams {
self.process(Some(&dgram)).await?;
self.process(Some(dgram)).await?;
}
},
Ready::Timeout => {
Expand Down
12 changes: 9 additions & 3 deletions neqo-bin/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use std::{io, net::SocketAddr};

use neqo_common::Datagram;
use neqo_udp::DatagramIter;

/// Ideally this would live in [`neqo-udp`]. [`neqo-udp`] is used in Firefox.
///
Expand Down Expand Up @@ -55,14 +56,19 @@ impl Socket {

/// Receive a batch of [`Datagram`]s on the given [`Socket`], each set with
/// the provided local address.
pub fn recv(&self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
pub fn recv<'a>(
&self,
local_address: &SocketAddr,
recv_buf: &'a mut [u8],
) -> Result<Option<DatagramIter<'a>>, io::Error> {
self.inner
.try_io(tokio::io::Interest::READABLE, || {
neqo_udp::recv_inner(local_address, &self.state, &self.inner)
neqo_udp::recv_inner(local_address, &self.state, &self.inner, recv_buf)
})
.map(Some)
.or_else(|e| {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(vec![])
Ok(None)
} else {
Err(e)
}
Expand Down
Loading

0 comments on commit 7c23ff6

Please sign in to comment.