Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't split header and body across TCP packets #168

Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1c06435
Compilation + test using zerocopy ok
stormshield-pj50 Jul 19, 2023
39fa1e5
Fix after comments
stormshield-pj50 Sep 7, 2023
5d4e2d2
cargo fmt
stormshield-pj50 Sep 8, 2023
62002ae
WIP Minimal diff
thomaseizinger Sep 17, 2023
ee00e1d
Some basic fixes
thomaseizinger Sep 17, 2023
801bf4f
Don't use `ready!` macro with `mem::replace`
thomaseizinger Oct 5, 2023
8e11260
Inline variable
thomaseizinger Oct 5, 2023
aeb4cd2
Remove `Init` state
thomaseizinger Oct 5, 2023
f90c990
Use `?` for decoding header
thomaseizinger Oct 5, 2023
93c3834
Use ctor
thomaseizinger Oct 5, 2023
872815e
Use type-system to only allocate for data frames
thomaseizinger Oct 5, 2023
722b7c8
Don't use `cast` outside of `header` module
thomaseizinger Oct 5, 2023
91e812a
Add TODO
thomaseizinger Oct 5, 2023
c095aac
Replace header::decode() with Frame<T>::try_from_header_buffer()
pjalaber Oct 4, 2023
2cab6b4
Cargo fmt
pjalaber Oct 5, 2023
166f8ff
Reduce diff
thomaseizinger Oct 6, 2023
5c6b172
Bring back header::decode
thomaseizinger Oct 6, 2023
524994f
Reduce diff
thomaseizinger Oct 6, 2023
0108a3d
Reduce diff
thomaseizinger Oct 6, 2023
8d32d16
Resolve todo
thomaseizinger Oct 6, 2023
851341e
Reduce diff
thomaseizinger Oct 6, 2023
9b26409
Reduce diff
thomaseizinger Oct 6, 2023
3df462d
Simplify things a bit further
thomaseizinger Oct 6, 2023
09cda48
Use body_len in debug impl
thomaseizinger Oct 6, 2023
5e3e65b
Remove generic length accessor
thomaseizinger Oct 6, 2023
ab29664
Reduce diff
thomaseizinger Oct 6, 2023
b65f2bd
Ensure we check max body len before allocating
thomaseizinger Oct 9, 2023
ee9c920
Don't allocate unless necessary
thomaseizinger Oct 9, 2023
ae3bc3d
WIP: Use `AsyncWrite::poll_write_vectored`
thomaseizinger Oct 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ parking_lot = "0.12"
rand = "0.8.3"
static_assertions = "1"
pin-project = "1.1.0"
zerocopy = { version = "0.7.0", features = ["derive"] }

[dev-dependencies]
quickcheck = "1.0"
futures = "0.3.4"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this dev-dependency needed? futures is already imported above using 0.3.12.

4 changes: 2 additions & 2 deletions yamux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
}
}

fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
fn poll_new_outbound(&mut self, cx: &Context<'_>) -> Poll<Result<Stream>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why diverge from the Future signature here? In other words, why remove the mut?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Triggered by a clippy lint, we can put it back but given that it is an internal API, I wouldn't bother.

if self.streams.len() >= self.config.max_num_streams {
log::error!("{}: maximum number of streams reached", self.id);
return Poll::Ready(Err(ConnectionError::TooManyStreams));
Expand Down Expand Up @@ -880,7 +880,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
if stream_id == CONNECTION_ID || self.streams.contains_key(&stream_id) {
let mut hdr = Header::ping(frame.header().nonce());
hdr.ack();
return Action::Ping(Frame::new(hdr));
return Action::Ping(Frame::no_body(hdr));
}
log::trace!(
"{}/{}: ping for unknown stream, possibly dropped earlier: {:?}",
Expand Down
2 changes: 1 addition & 1 deletion yamux/src/connection/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Future for Cleanup {
type Output = ConnectionError;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.get_mut();
let this = self.get_mut();

loop {
match this.state {
Expand Down
2 changes: 1 addition & 1 deletion yamux/src/connection/closing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.get_mut();
let this = self.get_mut();

loop {
match this.state {
Expand Down
2 changes: 1 addition & 1 deletion yamux/src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ impl AsyncWrite for Stream {
let k = std::cmp::min(shared.credit as usize, buf.len());
let k = std::cmp::min(k, self.config.split_send_size);
shared.credit = shared.credit.saturating_sub(k as u32);
Vec::from(&buf[..k])
&buf[..k]
};
let n = body.len();
let mut frame = Frame::data(self.id, body).expect("body <= u32::MAX").left();
Expand Down
144 changes: 95 additions & 49 deletions yamux/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,89 +13,144 @@ mod io;

use futures::future::Either;
use header::{Data, GoAway, Header, Ping, StreamId, WindowUpdate};
use std::{convert::TryInto, num::TryFromIntError};
use std::{convert::TryInto, fmt::Debug, marker::PhantomData, num::TryFromIntError};
use zerocopy::{AsBytes, Ref};

pub use io::FrameDecodeError;
pub(crate) use io::Io;

/// A Yamux message frame consisting of header and body.
#[derive(Clone, Debug, PartialEq, Eq)]
use self::header::HEADER_SIZE;

/// A Yamux message frame consisting of a single buffer with header followed by body.
/// The header can be zerocopy parsed into a Header struct by calling header()/header_mut().
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The header can be zerocopy parsed into a Header struct by calling header()/header_mut().
/// The header can be zerocopy parsed into a Header struct by calling `Header::header` and `Header::header_mut`.

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Frame<T> {
header: Header<T>,
body: Vec<u8>,
buffer: Vec<u8>,
_marker: PhantomData<T>,
}

impl<T> Frame<T> {
pub fn new(header: Header<T>) -> Self {
Frame {
header,
body: Vec::new(),
pub(crate) fn no_body(header: Header<T>) -> Self {
let mut buffer = vec![0; HEADER_SIZE];
header
.write_to(&mut buffer)
.expect("buffer is size of header");

Self {
buffer,
_marker: PhantomData,
}
}

pub fn header(&self) -> &Header<T> {
&self.header
Ref::<_, Header<T>>::new_from_prefix(self.buffer.as_slice())
.expect("buffer always holds a valid header")
.0
.into_ref()
}

pub fn header_mut(&mut self) -> &mut Header<T> {
&mut self.header
Ref::<_, Header<T>>::new_from_prefix(self.buffer.as_mut_slice())
.expect("buffer always holds a valid header")
.0
.into_mut()
}

pub(crate) fn buffer(&self) -> &[u8] {
self.buffer.as_slice()
}

/// Introduce this frame to the right of a binary frame type.
pub(crate) fn right<U>(self) -> Frame<Either<U, T>> {
Frame {
header: self.header.right(),
body: self.body,
buffer: self.buffer,
_marker: PhantomData,
}
}

/// Introduce this frame to the left of a binary frame type.
pub(crate) fn left<U>(self) -> Frame<Either<T, U>> {
Frame {
header: self.header.left(),
body: self.body,
buffer: self.buffer,
_marker: PhantomData,
}
}

pub(crate) fn into_generic_frame(self) -> Frame<()> {
Frame {
buffer: self.buffer,
_marker: PhantomData,
}
}
}

impl<A: header::private::Sealed> From<Frame<A>> for Frame<()> {
fn from(f: Frame<A>) -> Frame<()> {
Frame {
header: f.header.into(),
body: f.body,
buffer: f.buffer,
_marker: PhantomData,
}
}
}

impl Frame<()> {
pub(crate) fn try_from_header_buffer(
buffer: &[u8; HEADER_SIZE],
) -> Result<Either<Frame<()>, Frame<Data>>, FrameDecodeError> {
let header = header::decode(buffer)?;

let either = match header.try_into_data() {
Ok(data) => Either::Right(Frame::new(data)),
Err(other) => Either::Left(Frame::no_body(other)),
};

Ok(either)
}

pub(crate) fn into_data(self) -> Frame<Data> {
Frame {
header: self.header.into_data(),
body: self.body,
buffer: self.buffer,
_marker: PhantomData,
}
}

pub(crate) fn into_window_update(self) -> Frame<WindowUpdate> {
Frame {
header: self.header.into_window_update(),
body: self.body,
buffer: self.buffer,
_marker: PhantomData,
}
}

pub(crate) fn into_ping(self) -> Frame<Ping> {
Frame {
header: self.header.into_ping(),
body: self.body,
buffer: self.buffer,
_marker: PhantomData,
}
}
}

impl Frame<Data> {
pub fn data(id: StreamId, b: Vec<u8>) -> Result<Self, TryFromIntError> {
Ok(Frame {
header: Header::data(id, b.len().try_into()?),
body: b,
})
pub fn data(id: StreamId, b: &[u8]) -> Result<Self, TryFromIntError> {
let header = Header::data(id, b.len().try_into()?);

let mut frame = Frame::new(header);
frame.body_mut().copy_from_slice(b);

Ok(frame)
}

pub fn new(header: Header<Data>) -> Self {
let total_buffer_size = HEADER_SIZE + header.body_len();

let mut buffer = vec![0; total_buffer_size];
header
.write_to_prefix(&mut buffer)
.expect("buffer always fits the header");

Self {
buffer,
_marker: PhantomData,
}
}

pub fn close_stream(id: StreamId, ack: bool) -> Self {
Expand All @@ -109,48 +164,39 @@ impl Frame<Data> {
}

pub fn body(&self) -> &[u8] {
&self.body
&self.buffer[HEADER_SIZE..]
}

pub fn body_mut(&mut self) -> &mut [u8] {
&mut self.buffer[HEADER_SIZE..]
}

pub fn body_len(&self) -> u32 {
// Safe cast since we construct `Frame::<Data>`s only with
// `Vec<u8>` of length [0, u32::MAX] in `Frame::data` above.
self.body().len() as u32
}

pub fn into_body(self) -> Vec<u8> {
self.body
pub fn into_body(mut self) -> Vec<u8> {
// FIXME: Should we implement this more efficiently with `BytesMut`? I think that one would allow us to split of the body without allocating again ..
self.buffer.split_off(HEADER_SIZE)
}
}

impl Frame<WindowUpdate> {
pub fn window_update(id: StreamId, credit: u32) -> Self {
Frame {
header: Header::window_update(id, credit),
body: Vec::new(),
}
Frame::no_body(Header::window_update(id, credit))
}
}

impl Frame<GoAway> {
pub fn term() -> Self {
Frame {
header: Header::term(),
body: Vec::new(),
}
Frame::<GoAway>::no_body(Header::term())
}

pub fn protocol_error() -> Self {
Frame {
header: Header::protocol_error(),
body: Vec::new(),
}
Frame::<GoAway>::no_body(Header::protocol_error())
}

pub fn internal_error() -> Self {
Frame {
header: Header::internal_error(),
body: Vec::new(),
}
Frame::<GoAway>::no_body(Header::internal_error())
}
}
Loading