Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't split header and body across TCP packets #168

Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1c06435
Compilation + test using zerocopy ok
stormshield-pj50 Jul 19, 2023
39fa1e5
Fix after comments
stormshield-pj50 Sep 7, 2023
5d4e2d2
cargo fmt
stormshield-pj50 Sep 8, 2023
62002ae
WIP Minimal diff
thomaseizinger Sep 17, 2023
ee00e1d
Some basic fixes
thomaseizinger Sep 17, 2023
801bf4f
Don't use `ready!` macro with `mem::replace`
thomaseizinger Oct 5, 2023
8e11260
Inline variable
thomaseizinger Oct 5, 2023
aeb4cd2
Remove `Init` state
thomaseizinger Oct 5, 2023
f90c990
Use `?` for decoding header
thomaseizinger Oct 5, 2023
93c3834
Use ctor
thomaseizinger Oct 5, 2023
872815e
Use type-system to only allocate for data frames
thomaseizinger Oct 5, 2023
722b7c8
Don't use `cast` outside of `header` module
thomaseizinger Oct 5, 2023
91e812a
Add TODO
thomaseizinger Oct 5, 2023
c095aac
Replace header::decode() with Frame<T>::try_from_header_buffer()
pjalaber Oct 4, 2023
2cab6b4
Cargo fmt
pjalaber Oct 5, 2023
166f8ff
Reduce diff
thomaseizinger Oct 6, 2023
5c6b172
Bring back header::decode
thomaseizinger Oct 6, 2023
524994f
Reduce diff
thomaseizinger Oct 6, 2023
0108a3d
Reduce diff
thomaseizinger Oct 6, 2023
8d32d16
Resolve todo
thomaseizinger Oct 6, 2023
851341e
Reduce diff
thomaseizinger Oct 6, 2023
9b26409
Reduce diff
thomaseizinger Oct 6, 2023
3df462d
Simplify things a bit further
thomaseizinger Oct 6, 2023
09cda48
Use body_len in debug impl
thomaseizinger Oct 6, 2023
5e3e65b
Remove generic length accessor
thomaseizinger Oct 6, 2023
ab29664
Reduce diff
thomaseizinger Oct 6, 2023
b65f2bd
Ensure we check max body len before allocating
thomaseizinger Oct 9, 2023
ee9c920
Don't allocate unless necessary
thomaseizinger Oct 9, 2023
ae3bc3d
WIP: Use `AsyncWrite::poll_write_vectored`
thomaseizinger Oct 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yamux/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ parking_lot = "0.12"
rand = "0.8.3"
static_assertions = "1"
pin-project = "1.1.0"
zerocopy = { version = "0.7.0", features = ["derive"] }

[dev-dependencies]
quickcheck = "1.0"
2 changes: 1 addition & 1 deletion yamux/src/connection/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Future for Cleanup {
type Output = ConnectionError;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.get_mut();
let this = self.get_mut();

loop {
match this.state {
Expand Down
2 changes: 1 addition & 1 deletion yamux/src/connection/closing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.get_mut();
let this = self.get_mut();

loop {
match this.state {
Expand Down
2 changes: 1 addition & 1 deletion yamux/src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ impl AsyncWrite for Stream {
let k = std::cmp::min(shared.credit as usize, buf.len());
let k = std::cmp::min(k, self.config.split_send_size);
shared.credit = shared.credit.saturating_sub(k as u32);
Vec::from(&buf[..k])
&buf[..k]
};
let n = body.len();
let mut frame = Frame::data(self.id, body).expect("body <= u32::MAX").left();
Expand Down
143 changes: 79 additions & 64 deletions yamux/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,89 +13,130 @@ mod io;

use futures::future::Either;
use header::{Data, GoAway, Header, Ping, StreamId, WindowUpdate};
use std::{convert::TryInto, num::TryFromIntError};
use std::{convert::TryInto, fmt::Debug, marker::PhantomData, num::TryFromIntError};
use zerocopy::{AsBytes, ByteSlice, ByteSliceMut, Ref};

pub use io::FrameDecodeError;
pub(crate) use io::Io;

/// A Yamux message frame consisting of header and body.
#[derive(Clone, Debug, PartialEq, Eq)]
use crate::HeaderDecodeError;

use self::header::{Flags, Tag, HEADER_SIZE};

/// A Yamux message frame consisting of header and body in a single buffer
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Frame<T> {
header: Header<T>,
body: Vec<u8>,
buffer: Vec<u8>,
_marker: PhantomData<T>,
}

impl<T> Frame<T> {
pub fn new(header: Header<T>) -> Self {
Frame {
header,
body: Vec::new(),
pub(crate) fn new(header: Header<T>) -> Self {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
let total_buffer_size = HEADER_SIZE + header.len().val() as usize;

let mut buffer = vec![0; total_buffer_size];
header
.write_to_prefix(&mut buffer)
.expect("buffer always fits the header");

Self {
buffer,
_marker: PhantomData,
}
}

pub fn header(&self) -> &Header<T> {
&self.header
pub(crate) fn header(&self) -> &Header<T> {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
Ref::<_, Header<T>>::new_from_prefix(self.buffer.as_slice())
.expect("buffer always holds a valid header")
.0
.into_ref()
}

pub(crate) fn header_mut(&mut self) -> &mut Header<T> {
Ref::<_, Header<T>>::new_from_prefix(self.buffer.as_mut_slice())
.expect("buffer always holds a valid header")
.0
.into_mut()
}

pub(crate) fn buffer(&self) -> &[u8] {
self.buffer.as_slice()
}

pub fn header_mut(&mut self) -> &mut Header<T> {
&mut self.header
pub(crate) fn body(&self) -> &[u8] {
&self.buffer[HEADER_SIZE..]
}

pub(crate) fn body_len(&self) -> u32 {
self.body().len() as u32
}

pub(crate) fn into_body(mut self) -> Vec<u8> {
// FIXME: Should we implement this more efficiently with `BytesMut`? I think that one would allow us to split of the body without allocating again ..
self.buffer.split_off(HEADER_SIZE)
}

pub(crate) fn body_mut(&mut self) -> &mut [u8] {
&mut self.buffer[HEADER_SIZE..]
}

/// Introduce this frame to the right of a binary frame type.
pub(crate) fn right<U>(self) -> Frame<Either<U, T>> {
Frame {
header: self.header.right(),
body: self.body,
buffer: self.buffer,
_marker: PhantomData,
}
}

/// Introduce this frame to the left of a binary frame type.
pub(crate) fn left<U>(self) -> Frame<Either<T, U>> {
Frame {
header: self.header.left(),
body: self.body,
buffer: self.buffer,
_marker: PhantomData,
}
}
}

impl<A: header::private::Sealed> From<Frame<A>> for Frame<()> {
fn from(f: Frame<A>) -> Frame<()> {
Frame {
header: f.header.into(),
body: f.body,
buffer: f.buffer,
_marker: PhantomData,
}
}
}

impl Frame<()> {
pub(crate) fn into_data(self) -> Frame<Data> {
Frame {
header: self.header.into_data(),
body: self.body,
buffer: self.buffer,
_marker: PhantomData,
}
}

pub(crate) fn into_window_update(self) -> Frame<WindowUpdate> {
Frame {
header: self.header.into_window_update(),
body: self.body,
buffer: self.buffer,
_marker: PhantomData,
}
}

pub(crate) fn into_ping(self) -> Frame<Ping> {
Frame {
header: self.header.into_ping(),
body: self.body,
buffer: self.buffer,
_marker: PhantomData,
}
}
}

impl Frame<Data> {
pub fn data(id: StreamId, b: Vec<u8>) -> Result<Self, TryFromIntError> {
Ok(Frame {
header: Header::data(id, b.len().try_into()?),
body: b,
})
pub fn data(id: StreamId, body: &[u8]) -> Result<Self, TryFromIntError> {
let header = Header::data(id, body.len().try_into()?);

let mut frame = Frame::new(header);
frame.body_mut().copy_from_slice(body);

Ok(frame)
}

pub fn close_stream(id: StreamId, ack: bool) -> Self {
Expand All @@ -107,50 +148,24 @@ impl Frame<Data> {

Frame::new(header)
}

pub fn body(&self) -> &[u8] {
&self.body
}

pub fn body_len(&self) -> u32 {
// Safe cast since we construct `Frame::<Data>`s only with
// `Vec<u8>` of length [0, u32::MAX] in `Frame::data` above.
self.body().len() as u32
}

pub fn into_body(self) -> Vec<u8> {
self.body
}
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}

impl Frame<WindowUpdate> {
pub fn window_update(id: StreamId, credit: u32) -> Self {
Frame {
header: Header::window_update(id, credit),
body: Vec::new(),
}
pub fn window_update(id: StreamId, credit: u32) -> Frame<WindowUpdate> {
Frame::new(Header::window_update(id, credit))
}
}

impl Frame<GoAway> {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
pub fn term() -> Self {
Frame {
header: Header::term(),
body: Vec::new(),
}
pub fn term() -> Frame<GoAway> {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
Frame::<GoAway>::new(Header::term())
}

pub fn protocol_error() -> Self {
Frame {
header: Header::protocol_error(),
body: Vec::new(),
}
pub fn protocol_error() -> Frame<GoAway> {
Frame::<GoAway>::new(Header::protocol_error())
}

pub fn internal_error() -> Self {
Frame {
header: Header::internal_error(),
body: Vec::new(),
}
pub fn internal_error() -> Frame<GoAway> {
Frame::<GoAway>::new(Header::internal_error())
}
}
Loading
Loading