Skip to content

Commit

Permalink
feat(bin): don't allocate in server UDP recv path
Browse files Browse the repository at this point in the history
Previously the `neqo-bin` server would read a set of
datagrams from the socket and allocate them:

``` rust
let dgrams: Vec<Datagram> = dgrams.map(|d| d.to_owned()).collect();
```

This was done out of convenience, as handling `Datagram<&[u8]>`s, each borrowing
from `self.recv_buf`, is hard to get right across multiple `&mut self`
functions, that is here `self.run`, `self.process` and `self.find_socket`.

This commit combines `self.process` and `self.find_socket` and passes a socket
index, instead of the read `Datagram`s from `self.run` to `self.process`, thus
making the Rust borrow checker happy to handle borrowing `Datagram<&[u8]>`s
instead of owning `Datagram`s.
  • Loading branch information
mxinden committed Oct 26, 2024
1 parent 05b4af9 commit bdb5da9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 37 deletions.
2 changes: 1 addition & 1 deletion neqo-bin/src/server/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl HttpServer {
}

impl super::HttpServer for HttpServer {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output {
fn process(&mut self, dgram: Option<Datagram<&[u8]>>, now: Instant) -> Output {
self.server.process(dgram, now)
}

Expand Down
2 changes: 1 addition & 1 deletion neqo-bin/src/server/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Display for HttpServer {
}

impl super::HttpServer for HttpServer {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> neqo_http3::Output {
fn process(&mut self, dgram: Option<Datagram<&[u8]>>, now: Instant) -> neqo_http3::Output {
self.server.process(dgram, now)
}

Expand Down
80 changes: 45 additions & 35 deletions neqo-bin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use neqo_crypto::{
};
use neqo_http3::{Http3OrWebTransportStream, StreamId};
use neqo_transport::{server::ConnectionRef, Output, RandomConnectionIdGenerator, Version};
use neqo_udp::DatagramIter;
use tokio::time::Sleep;

use crate::{SharedArgs, STREAM_IO_BUFFER_SIZE};
Expand Down Expand Up @@ -194,7 +195,7 @@ fn qns_read_response(filename: &str) -> Result<Vec<u8>, io::Error> {

#[allow(clippy::module_name_repetitions)]
pub trait HttpServer: Display {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output;
fn process(&mut self, dgram: Option<Datagram<&[u8]>>, now: Instant) -> Output;
fn process_events(&mut self, now: Instant);
fn has_events(&self) -> bool;
}
Expand Down Expand Up @@ -224,36 +225,57 @@ impl ServerRunner {
}
}

/// Tries to find a socket, but then just falls back to sending from the first.
fn find_socket(&mut self, addr: SocketAddr) -> &mut crate::udp::Socket {
let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap();
rest.iter_mut()
.map(|(_host, socket)| socket)
.find(|socket| {
socket
.local_addr()
.ok()
.map_or(false, |socket_addr| socket_addr == addr)
})
.unwrap_or(first_socket)
}

async fn process(&mut self, mut dgram: Option<Datagram>) -> Result<(), io::Error> {
async fn process(&mut self, mut ready_socket_index: Option<usize>) -> Result<(), io::Error> {
let mut input_dgrams: Option<DatagramIter> = None;
loop {
match self.server.process(dgram.take(), (self.now)()) {
let input_dgram = if let Some(d) = input_dgrams.iter_mut().flatten().next() {
// Take datagram read in previous iteration.
Some(d)
} else {
// Try reading new datagrams from the socket.
if let Some(inx) = ready_socket_index {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
input_dgrams = socket.recv(*host, &mut self.recv_buf)?;
}
// Then take the first datagram, if any.
input_dgrams.iter_mut().flatten().next().map_or_else(
|| {
// Reading from the socket returned no datagrams. Don't try again.
ready_socket_index = None;
input_dgrams = None;
None
},
Some,
)
};

// Have server process in- and output datagrams.
match self.server.process(input_dgram, (self.now)()) {
Output::Datagram(dgram) => {
let socket = self.find_socket(dgram.source());
let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap();
let socket = rest
.iter_mut()
.map(|(_host, socket)| socket)
.find(|socket| {
socket
.local_addr()
.ok()
.map_or(false, |socket_addr| socket_addr == dgram.source())
})
.unwrap_or(first_socket);
socket.writable().await?;
socket.send(&dgram)?;
continue;
}
Output::Callback(new_timeout) => {
qdebug!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
break;
}
Output::None => {
break;
}
Output::None => {}
}

if input_dgrams.is_none() && ready_socket_index.is_none() {
break;
}
}
Ok(())
Expand Down Expand Up @@ -289,19 +311,7 @@ impl ServerRunner {
}

match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let Some(dgrams) = socket.recv(*host, &mut self.recv_buf)? else {
break;
};
if dgrams.len() == 0 {
break;
}
let dgrams: Vec<Datagram> = dgrams.map(|d| d.to_owned()).collect();
for dgram in dgrams {
self.process(Some(dgram)).await?;
}
},
Ready::Socket(inx) => self.process(Some(inx)).await?,
Ready::Timeout => {
self.timeout = None;
self.process(None).await?;
Expand Down

0 comments on commit bdb5da9

Please sign in to comment.