Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): accept borrowed instead of owned input datagram #2187

Merged
merged 3 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fuzz/fuzz_targets/client_initial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fuzz_target!(|data: &[u8]| {
};

let mut client = default_client();
let ci = client.process(None, now()).dgram().expect("a datagram");
let ci = client.process_output(now()).dgram().expect("a datagram");
let Some((header, d_cid, s_cid, payload)) = decode_initial_header(&ci, Role::Client) else {
return;
};
Expand Down Expand Up @@ -60,7 +60,7 @@ fuzz_target!(|data: &[u8]| {
let fuzzed_ci = Datagram::new(ci.source(), ci.destination(), ci.tos(), ciphertext);

let mut server = default_server();
let _response = server.process(Some(&fuzzed_ci), now());
let _response = server.process(Some(fuzzed_ci), now());
});

#[cfg(any(not(fuzzing), windows))]
Expand Down
9 changes: 3 additions & 6 deletions fuzz/fuzz_targets/server_initial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ fuzz_target!(|data: &[u8]| {
};

let mut client = default_client();
let ci = client.process(None, now()).dgram().expect("a datagram");
let ci = client.process_output(now()).dgram().expect("a datagram");
let mut server = default_server();
let si = server
.process(Some(&ci), now())
.dgram()
.expect("a datagram");
let si = server.process(Some(ci), now()).dgram().expect("a datagram");

let Some((header, d_cid, s_cid, payload)) = decode_initial_header(&si, Role::Server) else {
return;
Expand Down Expand Up @@ -64,7 +61,7 @@ fuzz_target!(|data: &[u8]| {
(header_enc.len() - 1)..header_enc.len(),
);
let fuzzed_si = Datagram::new(si.source(), si.destination(), si.tos(), ciphertext);
let _response = client.process(Some(&fuzzed_si), now());
let _response = client.process(Some(fuzzed_si), now());
});

#[cfg(any(not(fuzzing), windows))]
Expand Down
9 changes: 5 additions & 4 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,11 @@ impl super::Client for Connection {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
{
fn process_multiple_input<'a>(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
now: Instant,
) {
self.process_multiple_input(dgrams, now);
}

Expand Down
9 changes: 5 additions & 4 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ impl super::Client for Http3Client {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
{
fn process_multiple_input<'a>(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
now: Instant,
) {
self.process_multiple_input(dgrams, now);
}

Expand Down
62 changes: 35 additions & 27 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,11 @@ enum CloseState {
/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
trait Client {
fn process_output(&mut self, now: Instant) -> Output;
fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>;
fn process_multiple_input<'a>(
&mut self,
dgrams: impl IntoIterator<Item = Datagram<&'a [u8]>>,
now: Instant,
);
fn has_events(&self) -> bool;
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
Expand All @@ -392,9 +394,28 @@ struct Runner<'a, H: Handler> {
handler: H,
timeout: Option<Pin<Box<Sleep>>>,
args: &'a Args,
recv_buf: Vec<u8>,
}

impl<'a, H: Handler> Runner<'a, H> {
fn new(
local_addr: SocketAddr,
socket: &'a mut crate::udp::Socket,
client: H::Client,
handler: H,
args: &'a Args,
) -> Self {
Self {
local_addr,
socket,
client,
handler,
args,
timeout: None,
recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE],
}
}

async fn run(mut self) -> Res<Option<ResumptionToken>> {
loop {
let handler_done = self.handler.handle(&mut self.client)?;
Expand Down Expand Up @@ -457,12 +478,13 @@ impl<'a, H: Handler> Runner<'a, H> {

async fn process_multiple_input(&mut self) -> Res<()> {
loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
let Some(dgrams) = self.socket.recv(&self.local_addr, &mut self.recv_buf)? else {
break;
};
if dgrams.len() == 0 {
break;
}
self.client
.process_multiple_input(dgrams.iter(), Instant::now());
self.client.process_multiple_input(dgrams, Instant::now());
self.process_output().await?;
}

Expand Down Expand Up @@ -573,32 +595,18 @@ pub async fn client(mut args: Args) -> Res<()> {

let handler = http09::Handler::new(to_request, &args);

Runner {
args: &args,
client,
handler,
local_addr: real_local,
socket: &mut socket,
timeout: None,
}
.run()
.await?
Runner::new(real_local, &mut socket, client, handler, &args)
.run()
.await?
} else {
let client = http3::create_client(&args, real_local, remote_addr, &hostname, token)
.expect("failed to create client");

let handler = http3::Handler::new(to_request, &args);

Runner {
args: &args,
client,
handler,
local_addr: real_local,
socket: &mut socket,
timeout: None,
}
.run()
.await?
Runner::new(real_local, &mut socket, client, handler, &args)
.run()
.await?
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion neqo-bin/src/server/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl HttpServer {
}

impl super::HttpServer for HttpServer {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output {
self.server.process(dgram, now)
}

Expand Down
2 changes: 1 addition & 1 deletion neqo-bin/src/server/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Display for HttpServer {
}

impl super::HttpServer for HttpServer {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> neqo_http3::Output {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> neqo_http3::Output {
self.server.process(dgram, now)
}

Expand Down
15 changes: 10 additions & 5 deletions neqo-bin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ fn qns_read_response(filename: &str) -> Result<Vec<u8>, io::Error> {

#[allow(clippy::module_name_repetitions)]
pub trait HttpServer: Display {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output;
fn process_events(&mut self, now: Instant);
fn has_events(&self) -> bool;
}
Expand All @@ -205,6 +205,7 @@ pub struct ServerRunner {
server: Box<dyn HttpServer>,
timeout: Option<Pin<Box<Sleep>>>,
sockets: Vec<(SocketAddr, crate::udp::Socket)>,
recv_buf: Vec<u8>,
}

impl ServerRunner {
Expand All @@ -219,6 +220,7 @@ impl ServerRunner {
server,
timeout: None,
sockets,
recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE],
}
}

Expand All @@ -236,7 +238,7 @@ impl ServerRunner {
.unwrap_or(first_socket)
}

async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
async fn process(&mut self, mut dgram: Option<Datagram>) -> Result<(), io::Error> {
loop {
match self.server.process(dgram.take(), (self.now)()) {
Output::Datagram(dgram) => {
Expand Down Expand Up @@ -289,12 +291,15 @@ impl ServerRunner {
match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgrams = socket.recv(host)?;
if dgrams.is_empty() {
let Some(dgrams) = socket.recv(host, &mut self.recv_buf)? else {
break;
};
if dgrams.len() == 0 {
break;
}
let dgrams: Vec<Datagram> = dgrams.map(|d| d.to_owned()).collect();
for dgram in dgrams {
self.process(Some(&dgram)).await?;
self.process(Some(dgram)).await?;
}
},
Ready::Timeout => {
Expand Down
12 changes: 9 additions & 3 deletions neqo-bin/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use std::{io, net::SocketAddr};

use neqo_common::Datagram;
use neqo_udp::DatagramIter;

/// Ideally this would live in [`neqo-udp`]. [`neqo-udp`] is used in Firefox.
///
Expand Down Expand Up @@ -55,14 +56,19 @@ impl Socket {

/// Receive a batch of [`Datagram`]s on the given [`Socket`], each set with
/// the provided local address.
pub fn recv(&self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
pub fn recv<'a>(
&self,
local_address: &SocketAddr,
recv_buf: &'a mut [u8],
) -> Result<Option<DatagramIter<'a>>, io::Error> {
self.inner
.try_io(tokio::io::Interest::READABLE, || {
neqo_udp::recv_inner(local_address, &self.state, &self.inner)
neqo_udp::recv_inner(local_address, &self.state, &self.inner, recv_buf)
})
.map(Some)
.or_else(|e| {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(vec![])
Ok(None)
} else {
Err(e)
}
Expand Down
Loading