diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 863b34dced..450e09f1bd 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -247,7 +247,7 @@ impl Args { } "resumption" => { if self.urls.len() < 2 { - qerror!("Warning: resumption test won't work without >1 URL"); + qerror!("Warning: resumption tests won't work without >1 URL"); exit(127); } self.shared.use_old_http = true; diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index 1130178bc0..35990755e5 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -337,6 +337,41 @@ impl CongestionControl for ClassicCongestionControl { congestion || persistent_congestion } + /// Handle a congestion event. + /// Returns true if this was a true congestion event. + fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool { + // Start a new congestion event if lost or ECN CE marked packet was sent + // after the start of the previous congestion recovery period. + if !self.after_recovery_start(last_packet) { + return false; + } + + let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd( + self.congestion_window, + self.acked_bytes, + self.max_datagram_size(), + ); + self.congestion_window = max(cwnd, self.cwnd_min()); + self.acked_bytes = acked_bytes; + self.ssthresh = self.congestion_window; + qdebug!( + [self], + "Cong event -> recovery; cwnd {}, ssthresh {}", + self.congestion_window, + self.ssthresh + ); + qlog::metrics_updated( + &self.qlog, + &[ + QlogMetric::CongestionWindow(self.congestion_window), + QlogMetric::SsThresh(self.ssthresh), + QlogMetric::InRecovery(true), + ], + ); + self.set_state(State::RecoveryStart); + true + } + /// Report received ECN CE mark(s) to the congestion controller as a /// congestion event. /// @@ -537,41 +572,6 @@ impl ClassicCongestionControl { !self.state.transient() && self.recovery_start.map_or(true, |pn| packet.pn() >= pn) } - /// Handle a congestion event. - /// Returns true if this was a true congestion event. - fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool { - // Start a new congestion event if lost or ECN CE marked packet was sent - // after the start of the previous congestion recovery period. - if !self.after_recovery_start(last_packet) { - return false; - } - - let (cwnd, acked_bytes) = self.cc_algorithm.reduce_cwnd( - self.congestion_window, - self.acked_bytes, - self.max_datagram_size(), - ); - self.congestion_window = max(cwnd, self.cwnd_min()); - self.acked_bytes = acked_bytes; - self.ssthresh = self.congestion_window; - qdebug!( - [self], - "Cong event -> recovery; cwnd {}, ssthresh {}", - self.congestion_window, - self.ssthresh - ); - qlog::metrics_updated( - &self.qlog, - &[ - QlogMetric::CongestionWindow(self.congestion_window), - QlogMetric::SsThresh(self.ssthresh), - QlogMetric::InRecovery(true), - ], - ); - self.set_state(State::RecoveryStart); - true - } - fn app_limited(&self) -> bool { if self.bytes_in_flight >= self.congestion_window { false diff --git a/neqo-transport/src/cc/mod.rs b/neqo-transport/src/cc/mod.rs index bbb47c4fd0..43eed5d26c 100644 --- a/neqo-transport/src/cc/mod.rs +++ b/neqo-transport/src/cc/mod.rs @@ -62,6 +62,11 @@ pub trait CongestionControl: Display + Debug { lost_packets: &[SentPacket], ) -> bool; + /// Initiate a congestion response. + /// + /// Returns true if the congestion window was reduced. + fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool; + /// Returns true if the congestion window was reduced. fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket) -> bool; diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index d8d9db2422..e29f96e8ec 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -634,8 +634,8 @@ impl Connection { /// only use it where a more precise value is not important. fn pto(&self) -> Duration { self.paths.primary().map_or_else( - || RttEstimate::default().pto(PacketNumberSpace::ApplicationData), - |p| p.borrow().rtt().pto(PacketNumberSpace::ApplicationData), + || RttEstimate::default().pto(self.confirmed()), + |p| p.borrow().rtt().pto(self.confirmed()), ) } @@ -1058,7 +1058,7 @@ impl Connection { if let Some(p) = self.paths.primary() { let path = p.borrow(); let rtt = path.rtt(); - let pto = rtt.pto(PacketNumberSpace::ApplicationData); + let pto = rtt.pto(self.confirmed()); let idle_time = self.idle_timeout.expiry(now, pto); qtrace!([self], "Idle/keepalive timer {:?}", idle_time); @@ -1525,7 +1525,7 @@ impl Connection { let mut dcid = None; qtrace!([self], "{} input {}", path.borrow(), hex(&**d)); - let pto = path.borrow().rtt().pto(PacketNumberSpace::ApplicationData); + let pto = path.borrow().rtt().pto(self.confirmed()); // Handle each packet in the datagram. while !slc.is_empty() { @@ -2138,7 +2138,7 @@ impl Connection { // or the PTO timer fired: probe. true } else { - let pto = path.borrow().rtt().pto(PacketNumberSpace::ApplicationData); + let pto = path.borrow().rtt().pto(self.confirmed()); if !builder.packet_empty() { // The packet only contains an ACK. Check whether we want to // force an ACK with a PING so we can stop tracking packets. @@ -2419,13 +2419,15 @@ impl Connection { self.loss_recovery.on_packet_sent(path, sent); } - if *space == PacketNumberSpace::Handshake - && self.role == Role::Server - && self.state == State::Confirmed - { - // We could discard handshake keys in set_state, - // but wait until after sending an ACK. - self.discard_keys(PacketNumberSpace::Handshake, now); + if *space == PacketNumberSpace::Handshake { + if self.role == Role::Client { + // We're sending a Handshake packet, so we can discard Initial keys. + self.discard_keys(PacketNumberSpace::Initial, now); + } else if self.role == Role::Server && self.state == State::Confirmed { + // We could discard handshake keys in set_state, + // but wait until after sending an ACK. + self.discard_keys(PacketNumberSpace::Handshake, now); + } } } @@ -2776,11 +2778,6 @@ impl Connection { self.set_initial_limits(); } if self.crypto.install_keys(self.role)? { - if self.role == Role::Client { - // We won't acknowledge Initial packets as a result of this, but the - // server can rely on implicit acknowledgment. - self.discard_keys(PacketNumberSpace::Initial, now); - } self.saved_datagrams.make_available(CryptoSpace::Handshake); } } @@ -2788,6 +2785,10 @@ impl Connection { Ok(()) } + fn confirmed(&self) -> bool { + self.state == State::Confirmed + } + fn set_confirmed(&mut self) -> Res<()> { self.set_state(State::Confirmed); if self.conn_params.pmtud_enabled() { diff --git a/neqo-transport/src/connection/tests/ecn.rs b/neqo-transport/src/connection/tests/ecn.rs index ca99282cf5..cfe453b38a 100644 --- a/neqo-transport/src/connection/tests/ecn.rs +++ b/neqo-transport/src/connection/tests/ecn.rs @@ -85,10 +85,19 @@ fn handshake_delay_with_ecn_blackhole() { drop_ecn_marked_datagrams(), ); + let pto = DEFAULT_RTT * 3; + let half_rtt = DEFAULT_RTT / 2; assert_eq!( - (finish - start).as_millis() / DEFAULT_RTT.as_millis(), - 15, - "expected 6 RTT for client to detect blackhole, 6 RTT for server to detect blackhole and 3 RTT for handshake to be confirmed.", + (finish - start), + (1 + 2 + 4) * pto + // Client RTOs its CI w/ECN twice (3x total) + half_rtt + // Fourth CI w/o ECN being delivered + (1 + 2 + 4) * pto + // Server RTOs its CI w/ECN twice (3x total) + half_rtt + // Fourth SI w/o ECN being delivered + half_rtt + // Client ACK + half_rtt + // Client Handshake/Short + half_rtt + // Server HandshakeDone + half_rtt + // Client ACK + half_rtt ); } diff --git a/neqo-transport/src/connection/tests/handshake.rs b/neqo-transport/src/connection/tests/handshake.rs index b70b024c79..3025e9c310 100644 --- a/neqo-transport/src/connection/tests/handshake.rs +++ b/neqo-transport/src/connection/tests/handshake.rs @@ -30,7 +30,7 @@ use super::{ }; use crate::{ connection::{ - tests::{new_client, new_server}, + tests::{exchange_ticket, new_client, new_server}, AddressValidation, }, events::ConnectionEvent, @@ -591,8 +591,9 @@ fn reorder_1rtt() { now += RTT / 2; let s2 = server.process(c2.as_ref(), now).dgram(); // The server has now received those packets, and saved them. - // The two additional are a Handshake and a 1-RTT (w/ NEW_CONNECTION_ID). - assert_eq!(server.stats().packets_rx, PACKETS * 2 + 4); + // The two additional are an Initial w/ACK, a Handshake w/ACK and a 1-RTT (w/ + // NEW_CONNECTION_ID). + assert_eq!(server.stats().packets_rx, PACKETS * 2 + 5); assert_eq!(server.stats().saved_datagrams, PACKETS); assert_eq!(server.stats().dropped_rx, 1); assert_eq!(*server.state(), State::Confirmed); @@ -802,9 +803,9 @@ fn anti_amplification() { let ack = client.process(Some(&s_init3), now).dgram().unwrap(); assert!(!maybe_authenticate(&mut client)); // No need yet. - // The client sends a padded datagram, with just ACK for Handshake. - assert_eq!(client.stats().frame_tx.ack, ack_count + 1); - assert_eq!(client.stats().frame_tx.all(), frame_count + 1); + // The client sends a padded datagram, with just ACKs for Initial and Handshake. + assert_eq!(client.stats().frame_tx.ack, ack_count + 2); + assert_eq!(client.stats().frame_tx.all(), frame_count + 2); assert_ne!(ack.len(), client.plpmtu()); // Not padded (it includes Handshake). now += DEFAULT_RTT / 2; @@ -1047,29 +1048,28 @@ fn only_server_initial() { let server_dgram1 = server.process(client_dgram.as_ref(), now).dgram(); let server_dgram2 = server.process_output(now + AT_LEAST_PTO).dgram(); - // Only pass on the Initial from the first. We should get a Handshake in return. + // Only pass on the Initial from the first. We should get an ACK in return. let (initial, handshake) = split_datagram(&server_dgram1.unwrap()); assert!(handshake.is_some()); - // The client will not acknowledge the Initial as it discards keys. - // It sends a Handshake probe instead, containing just a PING frame. - assert_eq!(client.stats().frame_tx.ping, 0); + // The client sends an Initial ACK. + assert_eq!(client.stats().frame_tx.ack, 0); let probe = client.process(Some(&initial), now).dgram(); - assertions::assert_handshake(&probe.unwrap()); + assertions::assert_initial(&probe.unwrap(), false); assert_eq!(client.stats().dropped_rx, 0); - assert_eq!(client.stats().frame_tx.ping, 1); + assert_eq!(client.stats().frame_tx.ack, 1); let (initial, handshake) = split_datagram(&server_dgram2.unwrap()); assert!(handshake.is_some()); - // The same happens after a PTO, even though the client will discard the Initial packet. + // The same happens after a PTO. now += AT_LEAST_PTO; - assert_eq!(client.stats().frame_tx.ping, 1); + assert_eq!(client.stats().frame_tx.ack, 1); let discarded = client.stats().dropped_rx; let probe = client.process(Some(&initial), now).dgram(); - assertions::assert_handshake(&probe.unwrap()); - assert_eq!(client.stats().frame_tx.ping, 2); - assert_eq!(client.stats().dropped_rx, discarded + 1); + assertions::assert_initial(&probe.unwrap(), false); + assert_eq!(client.stats().frame_tx.ack, 2); + assert_eq!(client.stats().dropped_rx, discarded); // Pass the Handshake packet and complete the handshake. client.process_input(&handshake.unwrap(), now); @@ -1136,7 +1136,9 @@ fn implicit_rtt_server() { let dgram = server.process(dgram.as_ref(), now).dgram(); now += RTT / 2; let dgram = client.process(dgram.as_ref(), now).dgram(); - assertions::assert_handshake(dgram.as_ref().unwrap()); + let (initial, handshake) = split_datagram(dgram.as_ref().unwrap()); + assertions::assert_initial(&initial, false); + assertions::assert_handshake(handshake.as_ref().unwrap()); now += RTT / 2; server.process_input(&dgram.unwrap(), now); @@ -1219,6 +1221,44 @@ fn client_initial_retransmits_identical() { } } +#[test] +fn client_triggered_zerortt_retransmits_identical() { + let mut client = default_client(); + let mut server = default_server(); + connect(&mut client, &mut server); + + let token = exchange_ticket(&mut client, &mut server, now()); + let mut client = default_client(); + client + .enable_resumption(now(), token) + .expect("should set token"); + let mut server = resumed_server(&client); + + // Write 0-RTT before generating any packets. + // This should result in a datagram that coalesces Initial and 0-RTT. + let client_stream_id = client.stream_create(StreamType::UniDi).unwrap(); + client.stream_send(client_stream_id, &[1, 2, 3]).unwrap(); + let client_0rtt = client.process(None, now()); + assert!(client_0rtt.as_dgram_ref().is_some()); + let stats1 = client.stats().frame_tx; + + assertions::assert_coalesced_0rtt(&client_0rtt.as_dgram_ref().unwrap()[..]); + + let s1 = server.process(client_0rtt.as_dgram_ref(), now()); + assert!(s1.as_dgram_ref().is_some()); // Should produce ServerHello etc... + + // Drop the Initial packet from this. + let (_, s_hs) = split_datagram(s1.as_dgram_ref().unwrap()); + assert!(s_hs.is_some()); + + // Passing only the server handshake packet to the client should trigger a retransmit. + _ = client.process(s_hs.as_ref(), now()).dgram(); + let stats2 = client.stats().frame_tx; + assert_eq!(stats2.all(), stats1.all() * 2); + assert_eq!(stats2.crypto, stats1.crypto * 2); + assert_eq!(stats2.stream, stats1.stream * 2); +} + #[test] fn server_initial_retransmits_identical() { let mut now = now(); @@ -1253,6 +1293,67 @@ fn server_initial_retransmits_identical() { } } +#[test] +fn server_triggered_initial_retransmits_identical() { + let mut now = now(); + let mut client = default_client(); + let mut server = default_server(); + + let ci = client.process(None, now).dgram(); + now += DEFAULT_RTT / 2; + let si1 = server.process(ci.as_ref(), now); + let stats1 = server.stats().frame_tx; + + // Drop si and wait for client to retransmit. + let pto = client.process_output(now).callback(); + now += pto; + let ci = client.process_output(now).dgram(); + + // Feed the RTX'ed ci into the server before its PTO fires. The server + // should process it and then retransmit its Initial packet including + // any coalesced Handshake data. + let si2 = server.process(ci.as_ref(), now); + let stats2 = server.stats().frame_tx; + assert_eq!(si1.dgram().unwrap().len(), si2.dgram().unwrap().len()); + assert_eq!(stats2.all(), stats1.all() * 2); + assert_eq!(stats2.crypto, stats1.crypto * 2); + assert_eq!(stats2.ack, stats1.ack * 2); +} + +#[test] +fn client_handshake_retransmits_identical() { + let mut now = now(); + let mut client = default_client(); + let mut ci = client.process(None, now).dgram(); + let mut server = default_server(); + let mut si = server.process(ci.take().as_ref(), now).dgram(); + + now += DEFAULT_RTT; + + _ = client.process(si.take().as_ref(), now).callback(); + maybe_authenticate(&mut client); + + // Force the client to retransmit its coalesced Handshake/Short packet a number of times and + // make sure the retranmissions are identical to the original. Also, verify the PTO + // durations. + for i in 1..=3 { + _ = client.process(None, now).dgram().unwrap(); + let pto = client.process(None, now).callback(); + assert_eq!(pto, DEFAULT_RTT * 3 * (1 << (i - 1))); + now += pto; + + assert_eq!( + client.stats().frame_tx, + FrameStats { + crypto: i + 1, + ack: i + 1, + new_connection_id: i * 7, + ..Default::default() + } + ); + } +} + #[test] fn grease_quic_bit_transport_parameter() { fn get_remote_tp(conn: &Connection) -> bool { diff --git a/neqo-transport/src/connection/tests/idle.rs b/neqo-transport/src/connection/tests/idle.rs index 8677d7f5d5..6694e3bc19 100644 --- a/neqo-transport/src/connection/tests/idle.rs +++ b/neqo-transport/src/connection/tests/idle.rs @@ -721,6 +721,8 @@ fn keep_alive_with_ack_eliciting_packet_lost() { assert!(retransmit.is_some()); let retransmit = client.process_output(now).dgram(); assert!(retransmit.is_some()); + let retransmit = client.process_output(now).dgram(); + assert!(retransmit.is_some()); // The next callback will be an idle timeout. assert_eq!( diff --git a/neqo-transport/src/connection/tests/recovery.rs b/neqo-transport/src/connection/tests/recovery.rs index a97df1ca64..f19e9a2573 100644 --- a/neqo-transport/src/connection/tests/recovery.rs +++ b/neqo-transport/src/connection/tests/recovery.rs @@ -174,10 +174,12 @@ fn pto_initial() { assert_eq!(delay, INITIAL_PTO); // Resend initial after PTO. + let cwnd_prior: usize = cwnd(&client); now += delay; let pkt2 = client.process(None, now).dgram(); assert!(pkt2.is_some()); assert_eq!(pkt2.unwrap().len(), client.plpmtu()); + assert_eq!(cwnd_prior, 2 * cwnd(&client)); // cwnd has halved let delay = client.process(None, now).callback(); // PTO has doubled. @@ -224,7 +226,9 @@ fn pto_handshake_complete() { now += HALF_RTT; let pkt = client.process(pkt.as_ref(), now).dgram(); - assert_handshake(pkt.as_ref().unwrap()); + let (initial, handshake) = split_datagram(&pkt.clone().unwrap()); + assert_initial(&initial, false); + assert_handshake(handshake.as_ref().unwrap()); let cb = client.process(None, now).callback(); // The client now has a single RTT estimate (20ms), so @@ -250,7 +254,6 @@ fn pto_handshake_complete() { assert_eq!(client.stats.borrow().pto_counts, pto_counts); // Wait for PTO to expire and resend a handshake packet. - // Wait long enough that the 1-RTT PTO also fires. qdebug!("---- client: PTO"); now += HALF_RTT * 6; let pkt2 = client.process(None, now).dgram(); @@ -259,17 +262,11 @@ fn pto_handshake_complete() { pto_counts[0] = 1; assert_eq!(client.stats.borrow().pto_counts, pto_counts); - // Get a second PTO packet. - // Add some application data to this datagram, then split the 1-RTT off. + // Split the 1-RTT off. // We'll use that packet to force the server to acknowledge 1-RTT. - let stream_id = client.stream_create(StreamType::UniDi).unwrap(); - client.stream_close_send(stream_id).unwrap(); - now += HALF_RTT * 6; - let pkt3 = client.process(None, now).dgram(); - assert_handshake(pkt3.as_ref().unwrap()); - let (pkt3_hs, pkt3_1rtt) = split_datagram(&pkt3.unwrap()); - assert_handshake(&pkt3_hs); - assert!(pkt3_1rtt.is_some()); + let (pkt2_hs, pkt2_1rtt) = split_datagram(&pkt2.unwrap()); + assert_handshake(&pkt2_hs); + assert!(pkt2_1rtt.is_some()); // PTO has been doubled. let cb = client.process(None, now).callback(); @@ -286,7 +283,7 @@ fn pto_handshake_complete() { // This should remove the 1-RTT PTO from messing this test up. let server_acks = server.stats().frame_tx.ack; let server_done = server.stats().frame_tx.handshake_done; - server.process_input(&pkt3_1rtt.unwrap(), now); + server.process_input(&pkt2_1rtt.unwrap(), now); let ack = server.process(pkt1.as_ref(), now).dgram(); assert!(ack.is_some()); assert_eq!(server.stats().frame_tx.ack, server_acks + 2); @@ -295,19 +292,15 @@ fn pto_handshake_complete() { // Check that the other packets (pkt2, pkt3) are Handshake packets. // The server discarded the Handshake keys already, therefore they are dropped. // Note that these don't include 1-RTT packets, because 1-RTT isn't send on PTO. - let (pkt2_hs, pkt2_1rtt) = split_datagram(&pkt2.unwrap()); - assert_handshake(&pkt2_hs); - assert!(pkt2_1rtt.is_some()); let dropped_before1 = server.stats().dropped_rx; let server_frames = server.stats().frame_rx.all(); server.process_input(&pkt2_hs, now); assert_eq!(1, server.stats().dropped_rx - dropped_before1); assert_eq!(server.stats().frame_rx.all(), server_frames); - server.process_input(&pkt2_1rtt.unwrap(), now); let server_frames2 = server.stats().frame_rx.all(); let dropped_before2 = server.stats().dropped_rx; - server.process_input(&pkt3_hs, now); + server.process_input(&pkt2_hs, now); assert_eq!(1, server.stats().dropped_rx - dropped_before2); assert_eq!(server.stats().frame_rx.all(), server_frames2); diff --git a/neqo-transport/src/connection/tests/zerortt.rs b/neqo-transport/src/connection/tests/zerortt.rs index 0411103407..c34e1e673b 100644 --- a/neqo-transport/src/connection/tests/zerortt.rs +++ b/neqo-transport/src/connection/tests/zerortt.rs @@ -6,17 +6,25 @@ use std::{cell::RefCell, rc::Rc, time::Duration}; -use neqo_common::{event::Provider, qdebug}; +use neqo_common::{event::Provider, qdebug, Datagram, Decoder, Role}; use neqo_crypto::{AllowZeroRtt, AntiReplay}; -use test_fixture::{assertions, now}; +use test_fixture::{ + assertions, + header_protection::{ + apply_header_protection, decode_initial_header, initial_aead_and_hp, + remove_header_protection, + }, + now, split_datagram, +}; use super::{ super::Connection, connect, default_client, default_server, exchange_ticket, new_server, resumed_server, CountingConnectionIdGenerator, }; use crate::{ - events::ConnectionEvent, ConnectionParameters, Error, StreamType, Version, - MIN_INITIAL_PACKET_SIZE, + connection::tests::{new_client, DEFAULT_RTT}, + events::ConnectionEvent, + ConnectionParameters, Error, StreamType, Version, MIN_INITIAL_PACKET_SIZE, }; #[test] @@ -320,3 +328,156 @@ fn zero_rtt_loss_accepted() { ); } } + +#[test] +#[allow(clippy::too_many_lines)] +fn zerortt_reorder_frames() { + const ACK_FRAME: &[u8] = &[3, 0, 0, 0, 0, 1, 0, 0]; + const ACK_FRAME_2: &[u8] = &[3, 1, 0, 0, 1, 2, 0, 0]; + + let mut client = new_client( + ConnectionParameters::default() + .versions(Version::Version1, vec![Version::Version1]) + .grease(false), + ); + let mut server = new_server( + ConnectionParameters::default() + .versions(Version::Version1, vec![Version::Version1]) + .grease(false), + ); + let mut now = now(); + connect(&mut client, &mut server); + + let token = exchange_ticket(&mut client, &mut server, now); + let mut client = new_client( + ConnectionParameters::default() + .versions(Version::Version1, vec![Version::Version1]) + .grease(false), + ); + client + .enable_resumption(now, token) + .expect("should set token"); + let mut server = resumed_server(&client); + + // Write 0-RTT before generating any packets. + // This should result in a datagram that coalesces Initial and 0-RTT. + let client_stream_id = client.stream_create(StreamType::UniDi).unwrap(); + client.stream_send(client_stream_id, &[1, 2, 3]).unwrap(); + let client_0rtt = client.process(None, now); + assert!(client_0rtt.as_dgram_ref().is_some()); + assertions::assert_coalesced_0rtt(&client_0rtt.as_dgram_ref().unwrap()[..]); + + let (_, client_dcid, _, _) = + decode_initial_header(client_0rtt.as_dgram_ref().unwrap(), Role::Client).unwrap(); + let client_dcid = client_dcid.to_owned(); + + now += DEFAULT_RTT / 2; + let server_hs = server.process(client_0rtt.as_dgram_ref(), now); + assert!(server_hs.as_dgram_ref().is_some()); // Should produce ServerHello etc... + + let server_stream_id = server + .events() + .find_map(|evt| match evt { + ConnectionEvent::NewStream { stream_id } => Some(stream_id), + _ => None, + }) + .expect("should have received a new stream event"); + assert_eq!(client_stream_id, server_stream_id.as_u64()); + + // Now, only deliver the ACK from the server's Intial packet. + let (server_initial, _server_hs) = split_datagram(server_hs.as_dgram_ref().unwrap()); + let (protected_header, _, _, payload) = + decode_initial_header(&server_initial, Role::Server).unwrap(); + + // Now decrypt the packet. + let (aead, hp) = initial_aead_and_hp(&client_dcid, Role::Server); + let (header, pn) = remove_header_protection(&hp, protected_header, payload); + assert_eq!(pn, 0); + let pn_len = header.len() - protected_header.len(); + let mut buf = vec![0; payload.len()]; + let mut plaintext = aead + .decrypt(pn, &header, &payload[pn_len..], &mut buf) + .unwrap() + .to_owned(); + + // Now we need to find the frames. Make some really strong assumptions. + let mut dec = Decoder::new(&plaintext[..]); + assert_eq!(dec.decode(ACK_FRAME.len()), Some(ACK_FRAME)); + let end = dec.offset(); + + // Remove the CRYPTO frame. + plaintext[end..].fill(0); + + // And rebuild a packet. + let mut packet = header.clone(); + packet.resize(MIN_INITIAL_PACKET_SIZE, 0); + aead.encrypt(pn, &header, &plaintext, &mut packet[header.len()..]) + .unwrap(); + apply_header_protection(&hp, &mut packet, protected_header.len()..header.len()); + let modified = Datagram::new( + server_initial.source(), + server_initial.destination(), + server_initial.tos(), + packet, + ); + + // Deliver the ACK and make the client RTX. + now += DEFAULT_RTT / 2; + now += client.process(Some(&modified), now).callback(); + let client_out = client.process(None, now); + + // The server should get the retransmission. + now += DEFAULT_RTT / 2; + let server_initial = server.process(client_out.as_dgram_ref(), now); + let (server_initial, _) = split_datagram(server_initial.as_dgram_ref().unwrap()); + + // Reorder the ACK and CRYPTO frames in the server's Initial packet. + let (protected_header, _, _, payload) = + decode_initial_header(&server_initial, Role::Server).unwrap(); + + // Now decrypt the packet. + let (aead, hp) = initial_aead_and_hp(&client_dcid, Role::Server); + let (header, pn) = remove_header_protection(&hp, protected_header, payload); + assert_eq!(pn, 1); + let pn_len = header.len() - protected_header.len(); + let mut buf = vec![0; payload.len()]; + let mut plaintext = aead + .decrypt(pn, &header, &payload[pn_len..], &mut buf) + .unwrap() + .to_owned(); + + // Now we need to find the frames. Make some really strong assumptions. + let mut dec = Decoder::new(&plaintext[..]); + assert_eq!(dec.decode(ACK_FRAME_2.len()), Some(ACK_FRAME_2)); + assert_eq!(dec.decode_varint(), Some(0x06)); // CRYPTO + assert_eq!(dec.decode_varint(), Some(0x00)); // offset + dec.skip_vvec(); // Skip over the payload. + let end = dec.offset(); + + // Move the ACK frame after the CRYPTO frame. + plaintext[..end].rotate_left(ACK_FRAME_2.len()); + + // And rebuild a packet. + let mut packet = header.clone(); + packet.resize(MIN_INITIAL_PACKET_SIZE, 0); + aead.encrypt(pn, &header, &plaintext, &mut packet[header.len()..]) + .unwrap(); + apply_header_protection(&hp, &mut packet, protected_header.len()..header.len()); + let modified = Datagram::new( + server_initial.source(), + server_initial.destination(), + server_initial.tos(), + packet, + ); + + // Deliver the server's Initial (ACK + CRYPTO) after a delay long enough to trigger the + // application space PTO. + now += DEFAULT_RTT * 5; + let probe = client.process(Some(&modified), now).dgram().unwrap(); + assertions::assert_initial(&probe[..], true); + + now += client.process(None, now).callback(); + let probe = client.process(None, now).dgram().unwrap(); + assertions::assert_initial(&probe[..], true); + assertions::assert_coalesced_0rtt(&probe[..]); +} diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 49c289f60b..1e564b251e 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -30,7 +30,6 @@ use crate::{ rtt::{RttEstimate, RttSource}, sender::PacketSender, stats::FrameStats, - tracking::PacketNumberSpace, Stats, }; @@ -1020,7 +1019,7 @@ impl Path { pub fn on_packets_lost( &mut self, prev_largest_acked_sent: Option, - space: PacketNumberSpace, + confirmed: bool, lost_packets: &[SentPacket], stats: &mut Stats, now: Instant, @@ -1030,7 +1029,7 @@ impl Path { let cwnd_reduced = self.sender.on_packets_lost( self.rtt.first_sample_time(), prev_largest_acked_sent, - self.rtt.pto(space), // Important: the base PTO, not adjusted. + self.rtt.pto(confirmed), // Important: the base PTO, not adjusted. lost_packets, stats, now, @@ -1040,6 +1039,18 @@ impl Path { } } + /// Initiate a congestion response. + /// + /// Returns true if the congestion window was reduced. + pub fn on_congestion_event(&mut self, lost_packets: &[SentPacket], stats: &mut Stats) -> bool { + if let Some(last) = lost_packets.last() { + self.ecn_info.on_packets_lost(lost_packets, stats); + self.sender.on_congestion_event(last) + } else { + false + } + } + /// Determine whether we should be setting a PTO for this path. This is true when either the /// path is valid or when there is enough remaining in the amplification limit to fit a /// full-sized path (i.e., the path MTU). diff --git a/neqo-transport/src/recovery/mod.rs b/neqo-transport/src/recovery/mod.rs index f2c3e8e298..4a41ccb983 100644 --- a/neqo-transport/src/recovery/mod.rs +++ b/neqo-transport/src/recovery/mod.rs @@ -165,18 +165,15 @@ impl LossRecoverySpace { self.in_flight_outstanding > 0 } - pub fn pto_packets(&mut self, count: usize) -> impl Iterator { - self.sent_packets - .iter_mut() - .filter_map(|sent| { - if sent.pto() { - qtrace!("PTO: marking packet {} lost ", sent.pn()); - Some(&*sent) - } else { - None - } - }) - .take(count) + pub fn pto_packets(&mut self) -> impl Iterator { + self.sent_packets.iter_mut().filter_map(|sent| { + if sent.pto() { + qtrace!("PTO: marking packet {} lost ", sent.pn()); + Some(&*sent) + } else { + None + } + }) } pub fn pto_base_time(&self) -> Option { @@ -316,13 +313,15 @@ impl LossRecoverySpace { ); self.first_ooo_time = None; - let largest_acked = self.largest_acked; + let Some(largest_acked) = self.largest_acked else { + return; + }; for packet in self .sent_packets .iter_mut() // BTreeMap iterates in order of ascending PN - .take_while(|p| p.pn() < largest_acked.unwrap_or(PacketNumber::MAX)) + .take_while(|p| p.pn() < largest_acked) { // Packets sent before now - loss_delay are deemed lost. if packet.time_sent() + loss_delay <= now { @@ -332,7 +331,7 @@ impl LossRecoverySpace { packet.time_sent(), loss_delay ); - } else if largest_acked >= Some(packet.pn() + PACKET_THRESHOLD) { + } else if largest_acked >= packet.pn() + PACKET_THRESHOLD { qtrace!( "lost={}, is >= {} from largest acked {:?}", packet.pn(), @@ -340,9 +339,7 @@ impl LossRecoverySpace { largest_acked ); } else { - if largest_acked.is_some() { - self.first_ooo_time = Some(packet.time_sent()); - } + self.first_ooo_time = Some(packet.time_sent()); // No more packets can be declared lost after this one. break; }; @@ -627,7 +624,7 @@ impl LossRecovery { // as we rely on the count of in-flight packets to determine whether to send // another probe. Removing them too soon would result in not sending on PTO. let loss_delay = primary_path.borrow().rtt().loss_delay(); - let cleanup_delay = self.pto_period(primary_path.borrow().rtt(), pn_space); + let cleanup_delay = self.pto_period(primary_path.borrow().rtt()); let mut lost = Vec::new(); self.spaces.get_mut(pn_space).unwrap().detect_lost_packets( now, @@ -642,7 +639,7 @@ impl LossRecovery { // backoff, so that we can determine persistent congestion. primary_path.borrow_mut().on_packets_lost( prev_largest_acked, - pn_space, + self.is_confirmed(), &lost, &mut self.stats.borrow_mut(), now, @@ -679,6 +676,10 @@ impl LossRecovery { dropped } + const fn is_confirmed(&self) -> bool { + self.confirmed_time.is_some() + } + fn confirmed(&mut self, rtt: &RttEstimate, now: Instant) { debug_assert!(self.confirmed_time.is_none()); self.confirmed_time = Some(now); @@ -757,41 +758,42 @@ impl LossRecovery { fn pto_period_inner( rtt: &RttEstimate, pto_state: Option<&PtoState>, - pn_space: PacketNumberSpace, + confirmed: bool, fast_pto: u8, ) -> Duration { // This is a complicated (but safe) way of calculating: // base_pto * F * 2^pto_count // where F = fast_pto / FAST_PTO_SCALE (== 1 by default) let pto_count = pto_state.map_or(0, |p| u32::try_from(p.count).unwrap_or(0)); - rtt.pto(pn_space) + rtt.pto(confirmed) .checked_mul(u32::from(fast_pto) << min(pto_count, u32::BITS - u8::BITS)) .map_or(Duration::from_secs(3600), |p| p / u32::from(FAST_PTO_SCALE)) } /// Get the current PTO period for the given packet number space. /// Unlike calling `RttEstimate::pto` directly, this includes exponential backoff. - fn pto_period(&self, rtt: &RttEstimate, pn_space: PacketNumberSpace) -> Duration { - Self::pto_period_inner(rtt, self.pto_state.as_ref(), pn_space, self.fast_pto) + fn pto_period(&self, rtt: &RttEstimate) -> Duration { + Self::pto_period_inner( + rtt, + self.pto_state.as_ref(), + self.is_confirmed(), + self.fast_pto, + ) } // Calculate PTO time for the given space. fn pto_time(&self, rtt: &RttEstimate, pn_space: PacketNumberSpace) -> Option { - if self.confirmed_time.is_none() && pn_space == PacketNumberSpace::ApplicationData { - None - } else { - self.spaces.get(pn_space).and_then(|space| { - space - .pto_base_time() - .map(|t| t + self.pto_period(rtt, pn_space)) + self.spaces + .get(pn_space) + .and_then(|space: &LossRecoverySpace| { + space.pto_base_time().map(|t| t + self.pto_period(rtt)) }) - } } /// Find the earliest PTO time for all active packet number spaces. /// Ignore Application if either Initial or Handshake have an active PTO. fn earliest_pto(&self, rtt: &RttEstimate) -> Option { - if self.confirmed_time.is_some() { + if self.is_confirmed() { self.pto_time(rtt, PacketNumberSpace::ApplicationData) } else { self.pto_time(rtt, PacketNumberSpace::Initial) @@ -826,21 +828,17 @@ impl LossRecovery { /// When it has, mark a few packets as "lost" for the purposes of having frames /// regenerated in subsequent packets. The packets aren't truly lost, so /// we have to clone the `SentPacket` instance. - fn maybe_fire_pto(&mut self, rtt: &RttEstimate, now: Instant, lost: &mut Vec) { + fn maybe_fire_pto(&mut self, path: &PathRef, now: Instant, lost: &mut Vec) { let mut pto_space = None; // The spaces in which we will allow probing. let mut allow_probes = PacketNumberSpaceSet::default(); for pn_space in PacketNumberSpace::iter() { - if let Some(t) = self.pto_time(rtt, *pn_space) { + if let Some(t) = self.pto_time(path.borrow().rtt(), *pn_space) { allow_probes[*pn_space] = true; if t <= now { qdebug!([self], "PTO timer fired for {}", pn_space); let space = self.spaces.get_mut(*pn_space).unwrap(); - lost.extend( - space - .pto_packets(PtoState::pto_packet_count(*pn_space)) - .cloned(), - ); + lost.extend(space.pto_packets().cloned()); pto_space = pto_space.or(Some(*pn_space)); } @@ -851,6 +849,18 @@ impl LossRecovery { // pto_time to increase which might cause PTO for later packet number spaces to not fire. if let Some(pn_space) = pto_space { qtrace!([self], "PTO {}, probing {:?}", pn_space, allow_probes); + // Packets are only declared as lost relative to + // `largest_acked`. If we hit a PTO while we don't have a + // largest_acked yet, also do a congestion control reaction (because + // otherwise none would happen). + if self + .spaces + .get(pn_space) + .map_or(false, |space| space.largest_acked.is_none()) + { + path.borrow_mut() + .on_congestion_event(lost, &mut self.stats.borrow_mut()); + } self.fire_pto(pn_space, allow_probes); } } @@ -861,19 +871,20 @@ impl LossRecovery { let loss_delay = primary_path.borrow().rtt().loss_delay(); let mut lost_packets = Vec::new(); + let confirmed = self.is_confirmed(); for space in self.spaces.iter_mut() { let first = lost_packets.len(); // The first packet lost in this space. let pto = Self::pto_period_inner( primary_path.borrow().rtt(), self.pto_state.as_ref(), - space.space, + confirmed, self.fast_pto, ); space.detect_lost_packets(now, loss_delay, pto, &mut lost_packets); primary_path.borrow_mut().on_packets_lost( space.largest_acked_sent_time, - space.space, + confirmed, &lost_packets[first..], &mut self.stats.borrow_mut(), now, @@ -881,7 +892,7 @@ impl LossRecovery { } self.stats.borrow_mut().lost += lost_packets.len(); - self.maybe_fire_pto(primary_path.borrow().rtt(), now, &mut lost_packets); + self.maybe_fire_pto(primary_path, now, &mut lost_packets); lost_packets } @@ -950,7 +961,6 @@ mod tests { ecn::EcnCount, packet::{PacketNumber, PacketType}, path::{Path, PathRef}, - rtt::RttEstimate, stats::{Stats, StatsCell}, }; @@ -961,8 +971,8 @@ mod tests { const ON_SENT_SIZE: usize = 100; /// An initial RTT for using with `setup_lr`. - const TEST_RTT: Duration = ms(80); - const TEST_RTTVAR: Duration = ms(40); + const TEST_RTT: Duration = ms(7000); + const TEST_RTTVAR: Duration = ms(3500); struct Fixture { lr: LossRecovery, @@ -1033,6 +1043,7 @@ mod tests { ConnectionIdEntry::new(0, ConnectionId::from(&[1, 2, 3]), [0; 16]), ); path.set_primary(true); + path.rtt_mut().set_initial(TEST_RTT); Self { lr: LossRecovery::new(StatsCell::default(), FAST_PTO_SCALE), path: Rc::new(RefCell::new(path)), @@ -1510,13 +1521,13 @@ mod tests { ON_SENT_SIZE, )); - assert_eq!(lr.pto_time(PacketNumberSpace::ApplicationData), None); + assert!(lr.pto_time(PacketNumberSpace::ApplicationData).is_some()); lr.discard(PacketNumberSpace::Initial, pn_time(1)); - assert_eq!(lr.pto_time(PacketNumberSpace::ApplicationData), None); + assert!(lr.pto_time(PacketNumberSpace::ApplicationData).is_some()); // Expiring state after the PTO on the ApplicationData space has // expired should result in setting a PTO state. - let default_pto = RttEstimate::default().pto(PacketNumberSpace::ApplicationData); + let default_pto = lr.path.borrow().rtt().pto(true); let expected_pto = pn_time(2) + default_pto; lr.discard(PacketNumberSpace::Handshake, expected_pto); let profile = lr.send_profile(now()); @@ -1548,7 +1559,7 @@ mod tests { ON_SENT_SIZE, )); - let handshake_pto = RttEstimate::default().pto(PacketNumberSpace::Handshake); + let handshake_pto = lr.path.borrow().rtt().pto(false); let expected_pto = now() + handshake_pto; assert_eq!(lr.pto_time(PacketNumberSpace::Initial), Some(expected_pto)); let profile = lr.send_profile(now()); diff --git a/neqo-transport/src/rtt.rs b/neqo-transport/src/rtt.rs index 027b574aad..3265793c2c 100644 --- a/neqo-transport/src/rtt.rs +++ b/neqo-transport/src/rtt.rs @@ -19,7 +19,6 @@ use crate::{ qlog::{self, QlogMetric}, recovery::RecoveryToken, stats::FrameStats, - tracking::PacketNumberSpace, }; /// The smallest time that the system timer (via `sleep()`, `nanosleep()`, @@ -163,9 +162,9 @@ impl RttEstimate { self.smoothed_rtt } - pub fn pto(&self, pn_space: PacketNumberSpace) -> Duration { + pub fn pto(&self, hs_confirmed: bool) -> Duration { let mut t = self.estimate() + max(4 * self.rttvar, GRANULARITY); - if pn_space == PacketNumberSpace::ApplicationData { + if hs_confirmed { t += self.ack_delay.max(); } t diff --git a/neqo-transport/src/sender.rs b/neqo-transport/src/sender.rs index a9ead627aa..20213cc11e 100644 --- a/neqo-transport/src/sender.rs +++ b/neqo-transport/src/sender.rs @@ -113,6 +113,13 @@ impl PacketSender { self.maybe_update_pacer_mtu(); } + /// Initiate a congestion response. + /// + /// Returns true if the congestion window was reduced. + pub fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool { + self.cc.on_congestion_event(last_packet) + } + /// Called when packets are lost. Returns true if the congestion window was reduced. pub fn on_packets_lost( &mut self,