Skip to content

Commit

Permalink
fix(SendMessage): use SendStream::set_writable_event_low_watermark (#…
Browse files Browse the repository at this point in the history
…1838)

* fix(SendMessage): use SendStream::set_writable_event_low_watermark

Previously `SendMessage::send_data` could stall, if less than the minimum
message size is available to be sent. See
#1819 for details.

This commit implements solution (3) proposed in
#1819.

This commit introduces `SendStream::set_writable_event_low_watermark` which is
then used in `SendMessage::send_data` to signal to `SendStream` the minimum
required send space (low watermark) for the next send. Once reached,
`SendStream` emits a `SendStreamWritable` eventually triggering another
`SendMessage::send_data`.

Alternative to #1835. Compared to
#1835, this fix does not utilize the
`SendMessage` buffer, thus does not introduce an indirection to the send path.
In addition, under the assumption that available send space is increased in
larger batches, this fix does not send tiny data frames (2 byte header, 1 byte
goodput). Downside, compared to #1835, is
that it requires both changes in `neqo-transport` and `neqo-http3`.

Secondarily, this fixes #1821 as well.

* Move const

* Add documentation

* Add SendStream test

* Fix intra doc links

* Add neqo-http3 test

* Replace goodput with payload

* Re-trigger benchmarks

Let's see whether the "Download" benchmark is consistent.

* Rename emit_writable_event to maybe_emit_writable_event

* Replace expect with unwrap

* Use NonZeroUsize::get

* Replace expect with unwrap

* %s/Actually sending/Sending

* Typo

* Have update() return available amount

* Document setting once would suffice

* Reduce verbosity

* fix: drop RefCell mutable borrow early
  • Loading branch information
mxinden authored May 7, 2024
1 parent 20a4058 commit bb88aab
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 27 deletions.
12 changes: 10 additions & 2 deletions neqo-http3/src/send_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use std::{cell::RefCell, cmp::min, fmt::Debug, rc::Rc};
use std::{cell::RefCell, cmp::min, fmt::Debug, num::NonZeroUsize, rc::Rc};

use neqo_common::{qdebug, qtrace, Encoder, Header, MessageType};
use neqo_qpack::encoder::QPackEncoder;
Expand All @@ -17,6 +17,7 @@ use crate::{
SendStream, SendStreamEvents, Stream,
};

const MIN_DATA_FRAME_SIZE: usize = 3; // Minimal DATA frame size: 2 (header) + 1 (payload)
const MAX_DATA_HEADER_SIZE_2: usize = (1 << 6) - 1; // Maximal amount of data with DATA frame header size 2
const MAX_DATA_HEADER_SIZE_2_LIMIT: usize = MAX_DATA_HEADER_SIZE_2 + 3; // 63 + 3 (size of the next buffer data frame header)
const MAX_DATA_HEADER_SIZE_3: usize = (1 << 14) - 1; // Maximal amount of data with DATA frame header size 3
Expand Down Expand Up @@ -177,7 +178,14 @@ impl SendStream for SendMessage {
let available = conn
.stream_avail_send_space(self.stream_id())
.map_err(|e| Error::map_stream_send_errors(&e.into()))?;
if available <= 2 {
if available < MIN_DATA_FRAME_SIZE {
// Setting this once, instead of every time the available send space
// is exhausted, would suffice. That said, function call should be
// cheap, thus not worth optimizing.
conn.stream_set_writable_event_low_watermark(
self.stream_id(),
NonZeroUsize::new(MIN_DATA_FRAME_SIZE).unwrap(),
)?;
return Ok(0);
}
let to_send = if available <= MAX_DATA_HEADER_SIZE_2_LIMIT {
Expand Down
13 changes: 13 additions & 0 deletions neqo-http3/src/server_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ impl StreamHandler {
.send_data(self.stream_id(), buf, &mut self.conn.borrow_mut())
}

/// Bytes sendable on stream at the QUIC layer.
///
/// Note that this does not yet account for HTTP3 frame headers.
///
/// # Errors
///
/// It may return `InvalidStreamId` if a stream does not exist anymore.
pub fn available(&mut self) -> Res<usize> {
let stream_id = self.stream_id();
let n = self.conn.borrow_mut().stream_avail_send_space(stream_id)?;
Ok(n)
}

/// Close sending side.
///
/// # Errors
Expand Down
77 changes: 77 additions & 0 deletions neqo-http3/tests/httpconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,83 @@ fn test_103_response() {
process_client_events(&mut hconn_c);
}

/// Test [`neqo_http3::SendMessage::send_data`] to set
/// [`neqo_transport::SendStream::set_writable_event_low_watermark`].
#[allow(clippy::cast_possible_truncation)]
#[test]
fn test_data_writable_events_low_watermark() -> Result<(), Box<dyn std::error::Error>> {
const STREAM_LIMIT: u64 = 5000;
const DATA_FRAME_HEADER_SIZE: usize = 3;

// Create a client and a server.
let mut hconn_c = http3_client_with_params(Http3Parameters::default().connection_parameters(
ConnectionParameters::default().max_stream_data(StreamType::BiDi, false, STREAM_LIMIT),
));
let mut hconn_s = default_http3_server();
mem::drop(connect_peers(&mut hconn_c, &mut hconn_s));

// Client sends GET to server.
let stream_id = hconn_c.fetch(
now(),
"GET",
&("https", "something.com", "/"),
&[],
Priority::default(),
)?;
hconn_c.stream_close_send(stream_id)?;
exchange_packets(&mut hconn_c, &mut hconn_s, None);

// Server receives GET and responds with headers.
let mut request = receive_request(&mut hconn_s).unwrap();
request.send_headers(&[Header::new(":status", "200")])?;

// Sending these headers clears the server's send stream buffer and thus
// emits a DataWritable event.
exchange_packets(&mut hconn_c, &mut hconn_s, None);
let data_writable = |e| {
matches!(
e,
Http3ServerEvent::DataWritable {
stream
} if stream.stream_id() == stream_id
)
};
assert!(hconn_s.events().any(data_writable));

// Have server fill entire send buffer minus 1 byte.
let all_but_one = request.available()? - DATA_FRAME_HEADER_SIZE - 1;
let buf = vec![1; all_but_one];
let sent = request.send_data(&buf)?;
assert_eq!(sent, all_but_one);
assert_eq!(request.available()?, 1);

// Sending the buffered data clears the send stream buffer and thus emits a
// DataWritable event.
exchange_packets(&mut hconn_c, &mut hconn_s, None);
assert!(hconn_s.events().any(data_writable));

// Sending more fails, given that each data frame needs to be preceeded by a
// header, i.e. needs more than 1 byte of send space to send 1 byte payload.
assert_eq!(request.available()?, 1);
assert_eq!(request.send_data(&buf)?, 0);

// Have the client read all the pending data.
let mut recv_buf = vec![0_u8; all_but_one];
let (recvd, _) = hconn_c.read_data(now(), stream_id, &mut recv_buf)?;
assert_eq!(sent, recvd);
exchange_packets(&mut hconn_c, &mut hconn_s, None);

// Expect the server's available send space to be back to the stream limit.
assert_eq!(request.available()?, STREAM_LIMIT as usize);

// Expect the server to emit a DataWritable event, even though it always had
// at least 1 byte available to send, i.e. it never exhausted the entire
// available send space.
assert!(hconn_s.events().any(data_writable));

Ok(())
}

#[test]
fn test_data_writable_events() {
const STREAM_LIMIT: u64 = 5000;
Expand Down
29 changes: 29 additions & 0 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
fmt::{self, Debug},
iter, mem,
net::{IpAddr, SocketAddr},
num::NonZeroUsize,
ops::RangeInclusive,
rc::{Rc, Weak},
time::{Duration, Instant},
Expand Down Expand Up @@ -3184,6 +3185,34 @@ impl Connection {
Ok(self.streams.get_send_stream(stream_id)?.avail())
}

/// Set low watermark for [`ConnectionEvent::SendStreamWritable`] event.
///
/// Stream emits a [`crate::ConnectionEvent::SendStreamWritable`] event
/// when:
/// - the available sendable bytes increased to or above the watermark
/// - and was previously below the watermark.
///
/// Default value is `1`. In other words
/// [`crate::ConnectionEvent::SendStreamWritable`] is emitted whenever the
/// available sendable bytes was previously at `0` and now increased to `1`
/// or more.
///
/// Use this when your protocol needs at least `watermark` amount of available
/// sendable bytes to make progress.
///
/// # Errors
/// When the stream ID is invalid.
pub fn stream_set_writable_event_low_watermark(
&mut self,
stream_id: StreamId,
watermark: NonZeroUsize,
) -> Res<()> {
self.streams
.get_send_stream_mut(stream_id)?
.set_writable_event_low_watermark(watermark);
Ok(())
}

/// Close the stream. Enqueued data will be sent.
/// # Errors
/// When the stream ID is invalid.
Expand Down
9 changes: 5 additions & 4 deletions neqo-transport/src/fc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,16 @@ where
}
}

/// Update the maximum. Returns `true` if the change was an increase.
pub fn update(&mut self, limit: u64) -> bool {
/// Update the maximum. Returns `Some` with the updated available flow
/// control if the change was an increase and `None` otherwise.
pub fn update(&mut self, limit: u64) -> Option<usize> {
debug_assert!(limit < u64::MAX);
if limit > self.limit {
self.limit = limit;
self.blocked_frame = false;
true
Some(self.available())
} else {
false
None
}
}

Expand Down
97 changes: 88 additions & 9 deletions neqo-transport/src/send_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
collections::{btree_map::Entry, BTreeMap, VecDeque},
hash::{Hash, Hasher},
mem,
num::NonZeroUsize,
ops::Add,
rc::Rc,
};
Expand Down Expand Up @@ -710,6 +711,7 @@ pub struct SendStream {
sendorder: Option<SendOrder>,
bytes_sent: u64,
fair: bool,
writable_event_low_watermark: NonZeroUsize,
}

impl Hash for SendStream {
Expand All @@ -726,6 +728,7 @@ impl PartialEq for SendStream {
impl Eq for SendStream {}

impl SendStream {
#[allow(clippy::missing_panics_doc)] // not possible
pub fn new(
stream_id: StreamId,
max_stream_data: u64,
Expand All @@ -745,6 +748,7 @@ impl SendStream {
sendorder: None,
bytes_sent: 0,
fair: false,
writable_event_low_watermark: 1.try_into().unwrap(),
};
if ss.avail() > 0 {
ss.conn_events.send_stream_writable(stream_id);
Expand Down Expand Up @@ -1128,10 +1132,10 @@ impl SendStream {
SendStreamState::Send {
ref mut send_buf, ..
} => {
let previous_limit = send_buf.avail();
send_buf.mark_as_acked(offset, len);
if self.avail() > 0 {
self.conn_events.send_stream_writable(self.stream_id);
}
let current_limit = send_buf.avail();
self.maybe_emit_writable_event(previous_limit, current_limit);
}
SendStreamState::DataSent {
ref mut send_buf,
Expand Down Expand Up @@ -1203,14 +1207,21 @@ impl SendStream {
}
}

/// Set low watermark for [`crate::ConnectionEvent::SendStreamWritable`]
/// event.
///
/// See [`crate::Connection::stream_set_writable_event_low_watermark`].
pub fn set_writable_event_low_watermark(&mut self, watermark: NonZeroUsize) {
self.writable_event_low_watermark = watermark;
}

pub fn set_max_stream_data(&mut self, limit: u64) {
if let SendStreamState::Ready { fc, .. } | SendStreamState::Send { fc, .. } =
&mut self.state
{
let stream_was_blocked = fc.available() == 0;
fc.update(limit);
if stream_was_blocked && self.avail() > 0 {
self.conn_events.send_stream_writable(self.stream_id);
let previous_limit = fc.available();
if let Some(current_limit) = fc.update(limit) {
self.maybe_emit_writable_event(previous_limit, current_limit);
}
}
}
Expand Down Expand Up @@ -1369,6 +1380,27 @@ impl SendStream {
pub(crate) fn state(&mut self) -> &mut SendStreamState {
&mut self.state
}

pub(crate) fn maybe_emit_writable_event(
&mut self,
previous_limit: usize,
current_limit: usize,
) {
let low_watermark = self.writable_event_low_watermark.get();

// Skip if:
// - stream was not constrained by limit before,
// - or stream is still constrained by limit,
// - or stream is constrained by different limit.
if low_watermark < previous_limit
|| current_limit < low_watermark
|| self.avail() < low_watermark
{
return;
}

self.conn_events.send_stream_writable(self.stream_id);
}
}

impl ::std::fmt::Display for SendStream {
Expand Down Expand Up @@ -1756,7 +1788,7 @@ pub struct SendStreamRecoveryToken {

#[cfg(test)]
mod tests {
use std::{cell::RefCell, collections::VecDeque, rc::Rc};
use std::{cell::RefCell, collections::VecDeque, num::NonZeroUsize, rc::Rc};

use neqo_common::{event::Provider, hex_with_len, qtrace, Encoder};

Expand Down Expand Up @@ -2450,7 +2482,7 @@ mod tests {
// Increasing conn max (conn:4, stream:4) will unblock but not emit
// event b/c that happens in Connection::emit_frame() (tested in
// connection.rs)
assert!(conn_fc.borrow_mut().update(4));
assert!(conn_fc.borrow_mut().update(4).is_some());
assert_eq!(conn_events.events().count(), 0);
assert_eq!(s.avail(), 2);
assert_eq!(s.send(b"hello").unwrap(), 2);
Expand All @@ -2476,6 +2508,53 @@ mod tests {
assert_eq!(s.send(b"hello").unwrap(), 0);
}

#[test]
fn send_stream_writable_event_gen_with_watermark() {
let conn_fc = connection_fc(0);
let mut conn_events = ConnectionEvents::default();

let mut s = SendStream::new(4.into(), 0, Rc::clone(&conn_fc), conn_events.clone());
// Set watermark at 3.
s.set_writable_event_low_watermark(NonZeroUsize::new(3).unwrap());

// Stream is initially blocked (conn:0, stream:0, watermark: 3) and will
// not accept data.
assert_eq!(s.avail(), 0);
assert_eq!(s.send(b"hi!").unwrap(), 0);

// Increasing the connection limit (conn:10, stream:0, watermark: 3) will not generate
// event or allow sending anything. Stream is constrained by stream limit.
assert!(conn_fc.borrow_mut().update(10).is_some());
assert_eq!(s.avail(), 0);
assert_eq!(conn_events.events().count(), 0);

// Increasing the connection limit further (conn:11, stream:0, watermark: 3) will not
// generate event or allow sending anything. Stream wasn't constrained by connection
// limit before.
assert!(conn_fc.borrow_mut().update(11).is_some());
assert_eq!(s.avail(), 0);
assert_eq!(conn_events.events().count(), 0);

// Increasing to (conn:11, stream:2, watermark: 3) will allow 2 bytes
// but not generate a SendStreamWritable event as it is still below the
// configured watermark.
s.set_max_stream_data(2);
assert_eq!(conn_events.events().count(), 0);
assert_eq!(s.avail(), 2);

// Increasing to (conn:11, stream:3, watermark: 3) will generate an
// event as available sendable bytes are >= watermark.
s.set_max_stream_data(3);
let evts = conn_events.events().collect::<Vec<_>>();
assert_eq!(evts.len(), 1);
assert!(matches!(
evts[0],
ConnectionEvent::SendStreamWritable { .. }
));

assert_eq!(s.send(b"hi!").unwrap(), 3);
}

#[test]
fn send_stream_writable_event_new_stream() {
let conn_fc = connection_fc(2);
Expand Down
Loading

0 comments on commit bb88aab

Please sign in to comment.