Skip to content

Commit

Permalink
Yield endpoint-generated responses directly rather than buffering
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Jun 3, 2023
1 parent 1cf68a3 commit c4367ae
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 90 deletions.
97 changes: 45 additions & 52 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{HashMap, VecDeque},
collections::HashMap,
convert::TryFrom,
fmt, iter,
net::{IpAddr, SocketAddr},
Expand Down Expand Up @@ -39,7 +39,6 @@ use crate::{
/// `handle_event`.
pub struct Endpoint {
rng: StdRng,
transmits: VecDeque<Transmit>,
index: ConnectionIndex,
connections: Slab<ConnectionMeta>,
local_cid_generator: Box<dyn ConnectionIdGenerator>,
Expand All @@ -62,7 +61,6 @@ impl Endpoint {
) -> Self {
Self {
rng: StdRng::from_entropy(),
transmits: VecDeque::new(),
index: ConnectionIndex::default(),
connections: Slab::new(),
local_cid_generator: (config.connection_id_generator_factory.as_ref())(),
Expand All @@ -72,12 +70,6 @@ impl Endpoint {
}
}

/// Get the next packet to transmit
#[must_use]
pub fn poll_transmit(&mut self) -> Option<Transmit> {
self.transmits.pop_front()
}

/// Replace the server configuration, affecting new incoming connections only
pub fn set_server_config(&mut self, server_config: Option<Arc<ServerConfig>>) {
self.server_config = server_config;
Expand Down Expand Up @@ -129,7 +121,7 @@ impl Endpoint {
local_ip: Option<IpAddr>,
ecn: Option<EcnCodepoint>,
data: BytesMut,
) -> Option<(ConnectionHandle, DatagramEvent)> {
) -> Option<DatagramEvent> {
let datagram_len = data.len();
let (first_decode, remaining) = match PartialDecode::new(
data,
Expand Down Expand Up @@ -165,14 +157,13 @@ impl Endpoint {
for &version in &self.config.supported_versions {
buf.write(version);
}
self.transmits.push_back(Transmit {
return Some(DatagramEvent::Response(Transmit {
destination: remote,
ecn: None,
contents: buf.freeze(),
segment_size: None,
src_ip: local_ip,
});
return None;
}));
}
Err(e) => {
trace!("malformed header: {}", e);
Expand All @@ -186,15 +177,15 @@ impl Endpoint {

let addresses = FourTuple { remote, local_ip };
if let Some(ch) = self.index.get(&addresses, &first_decode) {
return Some((
return Some(DatagramEvent::ConnectionEvent(
ch,
DatagramEvent::ConnectionEvent(ConnectionEvent(ConnectionEventInner::Datagram {
ConnectionEvent(ConnectionEventInner::Datagram {
now,
remote: addresses.remote,
ecn,
first_decode,
remaining,
})),
}),
));
}

Expand All @@ -207,8 +198,9 @@ impl Endpoint {
Some(config) => config,
None => {
debug!("packet for unrecognized connection {}", dst_cid);
self.stateless_reset(datagram_len, addresses, dst_cid);
return None;
return self
.stateless_reset(datagram_len, addresses, dst_cid)
.map(DatagramEvent::Response);
}
};

Expand All @@ -234,9 +226,9 @@ impl Endpoint {
}
};
return match first_decode.finish(Some(&*crypto.header.remote)) {
Ok(packet) => self
.handle_first_packet(now, addresses, ecn, packet, remaining, &crypto)
.map(|(ch, conn)| (ch, DatagramEvent::NewConnection(conn))),
Ok(packet) => {
self.handle_first_packet(now, addresses, ecn, packet, remaining, &crypto)
}
Err(e) => {
trace!("unable to decode initial packet: {}", e);
None
Expand All @@ -252,14 +244,15 @@ impl Endpoint {

//
// If we got this far, we're a server receiving a seemingly valid packet for an unknown
// connection. Send a stateless reset.
// connection. Send a stateless reset if possible.
//

if !dst_cid.is_empty() {
self.stateless_reset(datagram_len, addresses, dst_cid);
} else {
trace!("dropping unrecognized short packet without ID");
return self
.stateless_reset(datagram_len, addresses, dst_cid)
.map(DatagramEvent::Response);
}

trace!("dropping unrecognized short packet without ID");
None
}

Expand All @@ -268,7 +261,7 @@ impl Endpoint {
inciting_dgram_len: usize,
addresses: FourTuple,
dst_cid: &ConnectionId,
) {
) -> Option<Transmit> {
/// Minimum amount of padding for the stateless reset to look like a short-header packet
const MIN_PADDING_LEN: usize = 5;

Expand All @@ -278,7 +271,7 @@ impl Endpoint {
Some(headroom) if headroom > MIN_PADDING_LEN => headroom - 1,
_ => {
debug!("ignoring unexpected {} byte packet: not larger than minimum stateless reset size", inciting_dgram_len);
return;
return None;
}
};

Expand All @@ -302,13 +295,13 @@ impl Endpoint {

debug_assert!(buf.len() < inciting_dgram_len);

self.transmits.push_back(Transmit {
Some(Transmit {
destination: addresses.remote,
ecn: None,
contents: buf.freeze(),
segment_size: None,
src_ip: addresses.local_ip,
});
})
}

/// Initiate a connection
Expand Down Expand Up @@ -401,7 +394,7 @@ impl Endpoint {
mut packet: Packet,
rest: Option<BytesMut>,
crypto: &Keys,
) -> Option<(ConnectionHandle, Connection)> {
) -> Option<DatagramEvent> {
let (src_cid, dst_cid, token, packet_number, version) = match packet.header {
Header::Initial {
src_cid,
Expand Down Expand Up @@ -436,15 +429,14 @@ impl Endpoint {
if self.connections.len() >= server_config.concurrent_connections as usize || self.is_full()
{
debug!("refusing connection");
self.initial_close(
return Some(DatagramEvent::Response(self.initial_close(
version,
addresses,
crypto,
&src_cid,
&loc_cid,
TransportError::CONNECTION_REFUSED(""),
);
return None;
)));
}

if dst_cid.len() < 8
Expand All @@ -454,15 +446,14 @@ impl Endpoint {
"rejecting connection due to invalid DCID length {}",
dst_cid.len()
);
self.initial_close(
return Some(DatagramEvent::Response(self.initial_close(
version,
addresses,
crypto,
&src_cid,
&loc_cid,
TransportError::PROTOCOL_VIOLATION("invalid destination CID length"),
);
return None;
)));
}

let (retry_src_cid, orig_dst_cid) = if server_config.use_retry {
Expand Down Expand Up @@ -490,14 +481,13 @@ impl Endpoint {
buf.extend_from_slice(&server_config.crypto.retry_tag(version, &dst_cid, &buf));
encode.finish(&mut buf, &*crypto.header.local, None);

self.transmits.push_back(Transmit {
return Some(DatagramEvent::Response(Transmit {
destination: addresses.remote,
ecn: None,
contents: buf.freeze(),
segment_size: None,
src_ip: addresses.local_ip,
});
return None;
}));
}

match RetryToken::from_bytes(
Expand All @@ -513,15 +503,14 @@ impl Endpoint {
}
_ => {
debug!("rejecting invalid stateless retry token");
self.initial_close(
return Some(DatagramEvent::Response(self.initial_close(
version,
addresses,
crypto,
&src_cid,
&loc_cid,
TransportError::INVALID_TOKEN(""),
);
return None;
)));
}
}
} else {
Expand Down Expand Up @@ -559,15 +548,18 @@ impl Endpoint {
match conn.handle_first_packet(now, addresses.remote, ecn, packet_number, packet, rest) {
Ok(()) => {
trace!(id = ch.0, icid = %dst_cid, "connection incoming");
Some((ch, conn))
Some(DatagramEvent::NewConnection(ch, conn))
}
Err(e) => {
debug!("handshake failed: {}", e);
self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
if let ConnectionError::TransportError(e) = e {
self.initial_close(version, addresses, crypto, &src_cid, &loc_cid, e);
Some(DatagramEvent::Response(self.initial_close(
version, addresses, crypto, &src_cid, &loc_cid, e,
)))
} else {
None
}
None
}
}
}
Expand Down Expand Up @@ -622,7 +614,7 @@ impl Endpoint {
remote_id: &ConnectionId,
local_id: &ConnectionId,
reason: TransportError,
) {
) -> Transmit {
let number = PacketNumber::U8(0);
let header = Header::Initial {
dst_cid: *remote_id,
Expand All @@ -643,13 +635,13 @@ impl Endpoint {
&*crypto.header.local,
Some((0, &*crypto.packet.local)),
);
self.transmits.push_back(Transmit {
Transmit {
destination: addresses.remote,
ecn: None,
contents: buf.freeze(),
segment_size: None,
src_ip: addresses.local_ip,
})
}
}

/// Access the configuration used by this endpoint
Expand Down Expand Up @@ -690,7 +682,6 @@ impl fmt::Debug for Endpoint {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Endpoint")
.field("rng", &self.rng)
.field("transmits", &self.transmits)
.field("index", &self.index)
.field("connections", &self.connections)
.field("config", &self.config)
Expand Down Expand Up @@ -843,9 +834,11 @@ impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
#[allow(clippy::large_enum_variant)] // Not passed around extensively
pub enum DatagramEvent {
/// The datagram is redirected to its `Connection`
ConnectionEvent(ConnectionEvent),
ConnectionEvent(ConnectionHandle, ConnectionEvent),
/// The datagram has resulted in starting a new `Connection`
NewConnection(Connection),
NewConnection(ConnectionHandle, Connection),
/// Response generated directly by the endpoint
Response(Transmit),
}

/// Errors in the parameters being used to create a new connection
Expand Down
1 change: 1 addition & 0 deletions quinn-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl coding::Codec for StreamId {

/// An outgoing packet
#[derive(Debug)]
#[must_use]
pub struct Transmit {
/// The socket this datagram should be sent to
pub destination: SocketAddr,
Expand Down
21 changes: 8 additions & 13 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,13 @@ fn version_negotiate_server() {
// Long-header packet with reserved version number
hex!("80 0a1a2a3a 04 00000000 04 00000000 00")[..].into(),
);
assert!(event.is_none());

let io = server.poll_transmit();
assert!(io.is_some());
if let Some(Transmit { contents, .. }) = io {
assert_ne!(contents[0] & 0x80, 0);
assert_eq!(&contents[1..15], hex!("00000000 04 00000000 04 00000000"));
assert!(contents[15..].chunks(4).any(|x| {
DEFAULT_SUPPORTED_VERSIONS.contains(&u32::from_be_bytes(x.try_into().unwrap()))
}));
}
assert_matches!(server.poll_transmit(), None);
let Some(DatagramEvent::Response(Transmit { contents, .. })) = event else { panic!("expected a response"); };

assert_ne!(contents[0] & 0x80, 0);
assert_eq!(&contents[1..15], hex!("00000000 04 00000000 04 00000000"));
assert!(contents[15..].chunks(4).any(|x| {
DEFAULT_SUPPORTED_VERSIONS.contains(&u32::from_be_bytes(x.try_into().unwrap()))
}));
}

#[test]
Expand Down Expand Up @@ -81,7 +76,7 @@ fn version_negotiate_client() {
)[..]
.into(),
);
if let Some((_, DatagramEvent::ConnectionEvent(event))) = opt_event {
if let Some(DatagramEvent::ConnectionEvent(_, event)) = opt_event {
client_ch.handle_event(event);
}
assert_matches!(
Expand Down
13 changes: 6 additions & 7 deletions quinn-proto/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,26 +310,25 @@ impl TestEndpoint {

while self.inbound.front().map_or(false, |x| x.0 <= now) {
let (recv_time, ecn, packet) = self.inbound.pop_front().unwrap();
if let Some((ch, event)) = self.endpoint.handle(recv_time, remote, None, ecn, packet) {
if let Some(event) = self.endpoint.handle(recv_time, remote, None, ecn, packet) {
match event {
DatagramEvent::NewConnection(conn) => {
DatagramEvent::NewConnection(ch, conn) => {
self.connections.insert(ch, conn);
self.accepted = Some(ch);
}
DatagramEvent::ConnectionEvent(event) => {
DatagramEvent::ConnectionEvent(ch, event) => {
self.conn_events
.entry(ch)
.or_insert_with(VecDeque::new)
.push_back(event);
}
DatagramEvent::Response(transmit) => {
self.outbound.extend(split_transmit(transmit));
}
}
}
}

while let Some(x) = self.poll_transmit() {
self.outbound.extend(split_transmit(x));
}

loop {
let mut endpoint_events: Vec<(ConnectionHandle, EndpointEvent)> = vec![];
for (ch, conn) in self.connections.iter_mut() {
Expand Down
Loading

0 comments on commit c4367ae

Please sign in to comment.