From fdb5bd26e62ec135702f21022f816037d74598da Mon Sep 17 00:00:00 2001 From: kixelated Date: Fri, 2 Aug 2024 13:35:45 -0700 Subject: [PATCH] WASM improvements (#30) * Don't take self in close functions. It's a pain when Drop is used. * Fix some WASM stuff. * Some WASM improvements. * WIP * Some fixes to the read_buf API * Oops advance_mut * Remove some less useful changes. * Not needed either. * Clippy --- web-transport-wasm/Cargo.toml | 6 +- web-transport-wasm/rust-toolchain.toml | 2 - web-transport-wasm/src/error.rs | 69 +++++++++---- web-transport-wasm/src/reader.rs | 24 +++-- web-transport-wasm/src/recv.rs | 45 ++++----- web-transport-wasm/src/send.rs | 19 ++-- web-transport-wasm/src/session.rs | 129 ++++++++++++++++++++----- web-transport-wasm/src/writer.rs | 20 ++-- web-transport/Cargo.toml | 5 +- web-transport/src/quinn.rs | 84 ++++++++++------ web-transport/src/wasm.rs | 31 +++--- 11 files changed, 286 insertions(+), 148 deletions(-) delete mode 100644 web-transport-wasm/rust-toolchain.toml diff --git a/web-transport-wasm/Cargo.toml b/web-transport-wasm/Cargo.toml index d13c55c..3e96cb9 100644 --- a/web-transport-wasm/Cargo.toml +++ b/web-transport-wasm/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/web-transport-rs" license = "MIT" -version = "0.1.1" +version = "0.2.0" edition = "2021" keywords = ["quic", "http3", "webtransport"] @@ -16,6 +16,8 @@ wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" js-sys = "0.3.69" bytes = "1" +thiserror = "1" +url = "2" [dependencies.web-sys] version = "0.3.69" @@ -24,11 +26,13 @@ features = [ "ReadableStreamDefaultReader", "ReadableStreamReadResult", "WebTransport", + "WebTransportOptions", "WebTransportBidirectionalStream", "WebTransportCloseInfo", "WebTransportSendStream", "WebTransportReceiveStream", "WebTransportDatagramDuplexStream", + "WebTransportCongestionControl", "WritableStream", "WritableStreamDefaultWriter", ] diff --git a/web-transport-wasm/rust-toolchain.toml b/web-transport-wasm/rust-toolchain.toml deleted file mode 100644 index 990104f..0000000 --- a/web-transport-wasm/rust-toolchain.toml +++ /dev/null @@ -1,2 +0,0 @@ -[toolchain] -targets = ["wasm32-unknown-unknown"] diff --git a/web-transport-wasm/src/error.rs b/web-transport-wasm/src/error.rs index 6caca26..974dc18 100644 --- a/web-transport-wasm/src/error.rs +++ b/web-transport-wasm/src/error.rs @@ -1,34 +1,61 @@ -use std::{error, fmt}; +use wasm_bindgen::prelude::*; -use wasm_bindgen::JsValue; +#[derive(Clone, Debug, thiserror::Error)] +#[error("web error: {0:?}")] +pub struct WebError(js_sys::Error); -#[derive(Debug)] -pub struct WebError { - value: JsValue, +impl From for WebError { + fn from(e: js_sys::Error) -> Self { + Self(e) + } } -impl From for WebError { - fn from(value: JsValue) -> Self { - Self { value } +impl From for WebError { + fn from(e: wasm_bindgen::JsValue) -> Self { + Self(e.into()) } } -impl error::Error for WebError {} +pub trait WebErrorExt { + fn throw(self) -> Result; +} -impl fmt::Display for WebError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // Print out the JsValue as a string - match self.value.as_string() { - Some(s) => write!(f, "{}", s), - None => write!(f, "{:?}", self.value), - } +impl> WebErrorExt for Result { + fn throw(self) -> Result { + self.map_err(Into::into) } } -impl From<&str> for WebError { - fn from(value: &str) -> Self { - Self { - value: value.into(), - } +#[derive(Clone, Debug, thiserror::Error)] +#[error("read error: {0:?}")] +pub struct ReadError(#[from] WebError); + +#[derive(Clone, Debug, thiserror::Error)] +#[error("write error: {0:?}")] +pub struct WriteError(#[from] WebError); + +#[derive(Clone, Debug, thiserror::Error)] +pub enum SessionError { + // TODO distinguish between different kinds of errors + #[error("read error: {0}")] + Read(#[from] ReadError), + + #[error("write error: {0}")] + Write(#[from] WriteError), + + #[error("web error: {0}")] + Web(#[from] WebError), +} + +pub(crate) trait PromiseExt { + fn ignore(self); +} + +impl PromiseExt for js_sys::Promise { + // Ignore the result of the promise by using an empty catch. + fn ignore(self) { + let closure = Closure::wrap(Box::new(|_: JsValue| {}) as Box); + let _ = self.catch(&closure); + closure.forget(); } } diff --git a/web-transport-wasm/src/reader.rs b/web-transport-wasm/src/reader.rs index 4043b65..450aff0 100644 --- a/web-transport-wasm/src/reader.rs +++ b/web-transport-wasm/src/reader.rs @@ -1,9 +1,9 @@ use js_sys::Reflect; -use wasm_bindgen::{JsCast, JsValue}; +use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; use web_sys::{ReadableStream, ReadableStreamDefaultReader, ReadableStreamReadResult}; -use crate::WebError; +use crate::{PromiseExt, ReadError, WebErrorExt}; // Wrapper around ReadableStream pub struct Reader { @@ -11,31 +11,35 @@ pub struct Reader { } impl Reader { - pub fn new(stream: &ReadableStream) -> Result { + pub fn new(stream: &ReadableStream) -> Result { let inner = stream.get_reader().unchecked_into(); Ok(Self { inner }) } - pub async fn read(&mut self) -> Result, WebError> { - let result: ReadableStreamReadResult = JsFuture::from(self.inner.read()).await?.into(); + pub async fn read(&mut self) -> Result, ReadError> { + let result: ReadableStreamReadResult = + JsFuture::from(self.inner.read()).await.throw()?.into(); - if Reflect::get(&result, &"done".into())?.is_truthy() { + if Reflect::get(&result, &"done".into()).throw()?.is_truthy() { return Ok(None); } - let res = Reflect::get(&result, &"value".into())?.dyn_into()?; + let res = Reflect::get(&result, &"value".into()) + .throw()? + .unchecked_into(); + Ok(Some(res)) } - pub fn close(self, reason: &str) { + pub fn close(&mut self, reason: &str) { let str = JsValue::from_str(reason); - let _ = self.inner.cancel_with_reason(&str); // ignore the promise + self.inner.cancel_with_reason(&str).ignore(); } } impl Drop for Reader { fn drop(&mut self) { - let _ = self.inner.cancel(); // ignore the promise + self.inner.cancel().ignore(); self.inner.release_lock(); } } diff --git a/web-transport-wasm/src/recv.rs b/web-transport-wasm/src/recv.rs index aee5859..a8c6748 100644 --- a/web-transport-wasm/src/recv.rs +++ b/web-transport-wasm/src/recv.rs @@ -4,7 +4,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use js_sys::Uint8Array; use web_sys::WebTransportReceiveStream; -use crate::{Reader, WebError}; +use crate::{ReadError, Reader}; pub struct RecvStream { reader: Reader, @@ -12,11 +12,7 @@ pub struct RecvStream { } impl RecvStream { - pub fn new(stream: WebTransportReceiveStream) -> Result { - if stream.locked() { - return Err("locked".into()); - } - + pub(super) fn new(stream: WebTransportReceiveStream) -> Result { let reader = Reader::new(&stream)?; Ok(Self { @@ -25,25 +21,30 @@ impl RecvStream { }) } - pub async fn read(&mut self, buf: &mut [u8]) -> Result, WebError> { - Ok(self.read_chunk(buf.len()).await?.map(|chunk| { - let size = chunk.len(); - buf[..size].copy_from_slice(&chunk); - size - })) + pub async fn read(&mut self, buf: &mut [u8]) -> Result, ReadError> { + let chunk = match self.read_chunk(buf.len()).await? { + Some(chunk) => chunk, + None => return Ok(None), + }; + + let size = chunk.len(); + buf[..size].copy_from_slice(&chunk); + Ok(Some(size)) } - pub async fn read_buf(&mut self, buf: &mut B) -> Result { - Ok(match self.read_chunk(buf.remaining_mut()).await? { - Some(chunk) => { - buf.put(chunk); - true - } - None => false, - }) + pub async fn read_buf(&mut self, buf: &mut B) -> Result, ReadError> { + let chunk = match self.read_chunk(buf.remaining_mut()).await? { + Some(chunk) => chunk, + None => return Ok(None), + }; + + let size = chunk.len(); + buf.put(chunk); + + Ok(Some(size)) } - pub async fn read_chunk(&mut self, max: usize) -> Result, WebError> { + pub async fn read_chunk(&mut self, max: usize) -> Result, ReadError> { if !self.buffer.is_empty() { let size = cmp::min(max, self.buffer.len()); let data = self.buffer.split_to(size).freeze(); @@ -64,7 +65,7 @@ impl RecvStream { Ok(Some(data)) } - pub fn stop(self, reason: &str) { + pub fn stop(&mut self, reason: &str) { self.reader.close(reason); } } diff --git a/web-transport-wasm/src/send.rs b/web-transport-wasm/src/send.rs index 10f98c3..9fe2f4c 100644 --- a/web-transport-wasm/src/send.rs +++ b/web-transport-wasm/src/send.rs @@ -2,7 +2,7 @@ use bytes::{Buf, Bytes}; use js_sys::{Reflect, Uint8Array}; use web_sys::WebTransportSendStream; -use crate::{WebError, Writer}; +use crate::{WriteError, Writer}; pub struct SendStream { stream: WebTransportSendStream, @@ -10,27 +10,28 @@ pub struct SendStream { } impl SendStream { - pub fn new(stream: WebTransportSendStream) -> Result { + pub(super) fn new(stream: WebTransportSendStream) -> Result { let writer = Writer::new(&stream)?; Ok(Self { stream, writer }) } - pub async fn write(&mut self, buf: &[u8]) -> Result { + pub async fn write(&mut self, buf: &[u8]) -> Result { self.writer.write(&Uint8Array::from(buf)).await?; Ok(buf.len()) } - pub async fn write_buf(&mut self, buf: &mut B) -> Result { - let chunk = buf.chunk(); - self.writer.write(&Uint8Array::from(chunk)).await?; - Ok(chunk.len()) + pub async fn write_buf(&mut self, buf: &mut B) -> Result { + let size = self.write(buf.chunk()).await?; + buf.advance(size); + + Ok(size) } - pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), WebError> { + pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), WriteError> { self.write(&buf).await.map(|_| ()) } - pub fn reset(self, reason: &str) { + pub fn reset(&mut self, reason: &str) { self.writer.close(reason); } diff --git a/web-transport-wasm/src/session.rs b/web-transport-wasm/src/session.rs index 799c66d..25be526 100644 --- a/web-transport-wasm/src/session.rs +++ b/web-transport-wasm/src/session.rs @@ -1,35 +1,37 @@ use bytes::Bytes; -use js_sys::Uint8Array; -use wasm_bindgen::JsCast; +use js_sys::{Object, Reflect, Uint8Array}; +use url::Url; use wasm_bindgen_futures::JsFuture; use web_sys::{ - WebTransportBidirectionalStream, WebTransportCloseInfo, WebTransportReceiveStream, + WebTransport, WebTransportBidirectionalStream, WebTransportCloseInfo, + WebTransportCongestionControl, WebTransportOptions, WebTransportReceiveStream, WebTransportSendStream, }; -use crate::{Reader, RecvStream, SendStream, WebError, Writer}; +use crate::{Reader, RecvStream, SendStream, SessionError, WebErrorExt, Writer}; #[derive(Clone)] pub struct Session { - inner: web_sys::WebTransport, + inner: WebTransport, } impl Session { - pub async fn new(url: &str) -> Result { - let inner = web_sys::WebTransport::new(url)?; - JsFuture::from(inner.ready()).await?; + pub fn create(url: Url) -> SessionBuilder { + SessionBuilder::new(url) + } - Ok(Self { inner }) + pub async fn connect(url: Url) -> Result { + Self::create(url).connect().await } - pub async fn accept_uni(&mut self) -> Result { + pub async fn accept_uni(&mut self) -> Result { let mut reader = Reader::new(&self.inner.incoming_unidirectional_streams())?; let stream: WebTransportReceiveStream = reader.read().await?.expect("closed without error"); let recv = RecvStream::new(stream)?; Ok(recv) } - pub async fn accept_bi(&mut self) -> Result<(SendStream, RecvStream), WebError> { + pub async fn accept_bi(&mut self) -> Result<(SendStream, RecvStream), SessionError> { let mut reader = Reader::new(&self.inner.incoming_bidirectional_streams())?; let stream: WebTransportBidirectionalStream = reader.read().await?.expect("closed without error"); @@ -40,11 +42,12 @@ impl Session { Ok((send, recv)) } - pub async fn open_bi(&mut self) -> Result<(SendStream, RecvStream), WebError> { + pub async fn open_bi(&mut self) -> Result<(SendStream, RecvStream), SessionError> { let stream: WebTransportBidirectionalStream = JsFuture::from(self.inner.create_bidirectional_stream()) - .await? - .dyn_into()?; + .await + .throw()? + .into(); let send = SendStream::new(stream.writable())?; let recv = RecvStream::new(stream.readable())?; @@ -52,37 +55,117 @@ impl Session { Ok((send, recv)) } - pub async fn open_uni(&mut self) -> Result { + pub async fn open_uni(&mut self) -> Result { let stream: WebTransportSendStream = JsFuture::from(self.inner.create_unidirectional_stream()) - .await? - .dyn_into()?; + .await + .throw()? + .into(); let send = SendStream::new(stream)?; Ok(send) } - pub async fn send_datagram(&mut self, payload: Bytes) -> Result<(), WebError> { + pub async fn send_datagram(&mut self, payload: Bytes) -> Result<(), SessionError> { let mut writer = Writer::new(&self.inner.datagrams().writable())?; writer.write(&Uint8Array::from(payload.as_ref())).await?; Ok(()) } - pub async fn recv_datagram(&mut self) -> Result { + pub async fn recv_datagram(&mut self) -> Result { let mut reader = Reader::new(&self.inner.datagrams().readable())?; let data: Uint8Array = reader.read().await?.unwrap_or_default(); Ok(data.to_vec().into()) } - pub fn close(self, code: u32, reason: &str) { + pub fn close(&mut self, code: u32, reason: &str) { let mut info = WebTransportCloseInfo::new(); info.close_code(code); info.reason(reason); self.inner.close_with_close_info(&info); } - pub async fn closed(&self) -> WebError { - let err = JsFuture::from(self.inner.closed()).await.unwrap(); - WebError::from(err) + pub async fn closed(&self) -> Result { + let result: js_sys::Object = JsFuture::from(self.inner.closed()).await.throw()?.into(); + + // For some reason, WebTransportCloseInfo only contains setters + let info = Closed { + code: Reflect::get(&result, &"closeCode".into()) + .throw()? + .as_f64() + .unwrap() as u32, + reason: Reflect::get(&result, &"reason".into()) + .throw()? + .as_string() + .unwrap(), + }; + + Ok(info) + } +} + +pub struct SessionBuilder { + url: Url, + options: WebTransportOptions, +} + +// Check https://rustwasm.github.io/wasm-bindgen/api/web_sys/struct.WebTransportOptions.html +impl SessionBuilder { + pub fn new(url: Url) -> Self { + Self { + url, + options: WebTransportOptions::new(), + } + } + + /// Determine if the client/server is allowed to pool connections. + /// (Hint) Don't set it to true. + pub fn allow_pooling(mut self, val: bool) -> Self { + self.options.allow_pooling(val); + self + } + + /// Determine if HTTP/2 is a valid fallback. + pub fn require_unreliable(mut self, val: bool) -> Self { + self.options.require_unreliable(val); + self + } + + /// Hint at the required congestion control algorithm + pub fn congestion_control(mut self, control: CongestionControl) -> Self { + self.options.congestion_control(control); + self + } + + /// Supply sha256 hashes for accepted certificates, instead of using a root CA + pub fn server_certificate_hashes(mut self, hashes: Vec>) -> Self { + // expected: [ { algorithm: "sha-256", value: hashValue }, ... ] + let hashes = hashes + .into_iter() + .map(|hash| { + let hash = Uint8Array::from(&hash[..]); + let obj = Object::new(); + Reflect::set(&obj, &"algorithm".into(), &"sha-256".into()).unwrap(); + Reflect::set(&obj, &"value".into(), &hash.into()).unwrap(); + obj + }) + .collect::(); + + self.options.server_certificate_hashes(&hashes); + self + } + + pub async fn connect(self) -> Result { + let inner = WebTransport::new_with_options(self.url.as_ref(), &self.options).throw()?; + JsFuture::from(inner.ready()).await.throw()?; + + Ok(Session { inner }) } } + +pub struct Closed { + pub code: u32, + pub reason: String, +} + +pub type CongestionControl = WebTransportCongestionControl; diff --git a/web-transport-wasm/src/writer.rs b/web-transport-wasm/src/writer.rs index 7c7ef51..9e18f81 100644 --- a/web-transport-wasm/src/writer.rs +++ b/web-transport-wasm/src/writer.rs @@ -1,8 +1,8 @@ -use wasm_bindgen::{JsCast, JsValue}; +use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; use web_sys::{WritableStream, WritableStreamDefaultWriter}; -use crate::WebError; +use crate::{PromiseExt, WebErrorExt, WriteError}; // Wrapper around WritableStream pub struct Writer { @@ -10,25 +10,27 @@ pub struct Writer { } impl Writer { - pub fn new(stream: &WritableStream) -> Result { - let inner = stream.get_writer()?.unchecked_into(); + pub fn new(stream: &WritableStream) -> Result { + let inner = stream.get_writer().throw()?.unchecked_into(); Ok(Self { inner }) } - pub async fn write(&mut self, v: &JsValue) -> Result<(), WebError> { - JsFuture::from(self.inner.write_with_chunk(v)).await?; + pub async fn write(&mut self, v: &JsValue) -> Result<(), WriteError> { + JsFuture::from(self.inner.write_with_chunk(v)) + .await + .throw()?; Ok(()) } - pub fn close(self, reason: &str) { + pub fn close(&mut self, reason: &str) { let str = JsValue::from_str(reason); - let _ = self.inner.abort_with_reason(&str); // ignore the promise + self.inner.abort_with_reason(&str).ignore(); } } impl Drop for Writer { fn drop(&mut self) { - let _ = self.inner.close(); // ignore the promise + self.inner.close().ignore(); self.inner.release_lock(); } } diff --git a/web-transport/Cargo.toml b/web-transport/Cargo.toml index 8192901..d992779 100644 --- a/web-transport/Cargo.toml +++ b/web-transport/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley"] repository = "https://github.com/kixelated/web-transport-rs" license = "MIT" -version = "0.3.1" +version = "0.4.0" edition = "2021" keywords = ["quic", "http3", "webtransport"] @@ -15,10 +15,9 @@ categories = ["network-programming", "web-programming"] [dependencies] bytes = "1" -thiserror = "1" [target.'cfg(target_arch = "wasm32")'.dependencies] -web-transport-wasm = { version = "0.1", path = "../web-transport-wasm" } +web-transport-wasm = { version = "0.2", path = "../web-transport-wasm" } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] web-transport-quinn = { version = "0.3", path = "../web-transport-quinn" } diff --git a/web-transport/src/quinn.rs b/web-transport/src/quinn.rs index c9e2473..ce83559 100644 --- a/web-transport/src/quinn.rs +++ b/web-transport/src/quinn.rs @@ -1,37 +1,42 @@ use bytes::{Buf, BufMut, Bytes}; +// Export the Quinn implementation to simplify Cargo.toml +pub use web_transport_quinn as quinn; + /// A WebTransport Session, able to accept/create streams and send/recv datagrams. /// /// The session can be cloned to create multiple handles. /// The session will be closed with on drop. #[derive(Clone)] -pub struct Session(web_transport_quinn::Session); +pub struct Session { + inner: web_transport_quinn::Session, +} impl Session { /// Block until the peer creates a new unidirectional stream. pub async fn accept_uni(&mut self) -> Result { - self.0.accept_uni().await.map(RecvStream) + self.inner.accept_uni().await.map(RecvStream::new) } /// Block until the peer creates a new bidirectional stream. pub async fn accept_bi(&mut self) -> Result<(SendStream, RecvStream), SessionError> { - self.0 + self.inner .accept_bi() .await - .map(|(s, r)| (SendStream(s), RecvStream(r))) + .map(|(s, r)| (SendStream::new(s), RecvStream::new(r))) } /// Open a new bidirectional stream, which may block when there are too many concurrent streams. pub async fn open_bi(&mut self) -> Result<(SendStream, RecvStream), SessionError> { - self.0 + self.inner .open_bi() .await - .map(|(s, r)| (SendStream(s), RecvStream(r))) + .map(|(s, r)| (SendStream::new(s), RecvStream::new(r))) } /// Open a new unidirectional stream, which may block when there are too many concurrent streams. pub async fn open_uni(&mut self) -> Result { - self.0.open_uni().await.map(SendStream) + self.inner.open_uni().await.map(SendStream::new) } /// Send a datagram over the network. @@ -45,34 +50,35 @@ impl Session { /// - ??? pub async fn send_datagram(&mut self, payload: Bytes) -> Result<(), SessionError> { // NOTE: This is not async, but we need to make it async to match the wasm implementation. - self.0.send_datagram(payload) + self.inner.send_datagram(payload) } /// The maximum size of a datagram that can be sent. pub async fn max_datagram_size(&self) -> usize { - self.0.max_datagram_size() + self.inner.max_datagram_size() } /// Receive a datagram over the network. pub async fn recv_datagram(&mut self) -> Result { - self.0.read_datagram().await + self.inner.read_datagram().await } /// Close the connection immediately with a code and reason. - pub fn close(self, code: u32, reason: &str) { - self.0.close(code, reason.as_bytes()) + pub fn close(&mut self, code: u32, reason: &str) { + self.inner.close(code, reason.as_bytes()) } /// Block until the connection is closed. - pub async fn closed(&self) -> SessionError { - self.0.closed().await + pub async fn closed(&self) -> Result<(), SessionError> { + // TODO correctly parse the code/reason + Err(self.inner.closed().await) } } /// Convert a `web_transport_quinn::Session` into a `web_transport::Session`. impl From for Session { fn from(session: web_transport_quinn::Session) -> Self { - Session(session) + Session { inner: session } } } @@ -80,17 +86,23 @@ impl From for Session { /// /// QUIC streams have flow control, which means the send rate is limited by the peer's receive window. /// The stream will be closed with a graceful FIN when dropped. -pub struct SendStream(web_transport_quinn::SendStream); +pub struct SendStream { + inner: web_transport_quinn::SendStream, +} impl SendStream { + fn new(inner: web_transport_quinn::SendStream) -> Self { + Self { inner } + } + /// Write some of the buffer to the stream, potentailly blocking on flow control. pub async fn write(&mut self, buf: &[u8]) -> Result { - self.0.write(buf).await + self.inner.write(buf).await } /// Write some of the given buffer to the stream, potentially blocking on flow control. pub async fn write_buf(&mut self, buf: &mut B) -> Result { - let size = self.0.write(buf.chunk()).await?; + let size = self.inner.write(buf.chunk()).await?; buf.advance(size); Ok(size) } @@ -99,19 +111,19 @@ impl SendStream { /// /// More efficient for some implementations, as it avoids a copy pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), WriteError> { - self.0.write_chunk(buf).await + self.inner.write_chunk(buf).await } /// Set the stream's priority. /// /// Streams with lower values will be sent first, but are not guaranteed to arrive first. pub fn set_priority(&mut self, order: i32) { - self.0.set_priority(order).ok(); + self.inner.set_priority(order).ok(); } /// Send an immediate reset code, closing the stream. - pub fn reset(mut self, code: u32) { - self.0.reset(code).ok(); + pub fn reset(&mut self, code: u32) { + self.inner.reset(code).ok(); } } @@ -119,39 +131,49 @@ impl SendStream { /// /// All bytes are flushed in order and the stream is flow controlled. /// The stream will be closed with STOP_SENDING code=0 when dropped. -pub struct RecvStream(web_transport_quinn::RecvStream); +pub struct RecvStream { + inner: web_transport_quinn::RecvStream, +} impl RecvStream { + fn new(inner: web_transport_quinn::RecvStream) -> Self { + Self { inner } + } + /// Read some data into the provided buffer. pub async fn read(&mut self, buf: &mut [u8]) -> Result, ReadError> { - self.0.read(buf).await + self.inner.read(buf).await } /// Read some data into the provided buffer. - pub async fn read_buf(&mut self, buf: &mut B) -> Result { + pub async fn read_buf(&mut self, buf: &mut B) -> Result, ReadError> { let dst = buf.chunk_mut(); let dst = unsafe { &mut *(dst as *mut _ as *mut [u8]) }; - let size = match self.0.read(dst).await? { + let size = match self.inner.read(dst).await? { Some(size) => size, - None => return Ok(false), + None => return Ok(None), }; unsafe { buf.advance_mut(size) }; - Ok(true) + Ok(Some(size)) } /// Read the next chunk of data with the provided maximum size. /// /// More efficient for some implementations, as it avoids a copy pub async fn read_chunk(&mut self, max: usize) -> Result, ReadError> { - Ok(self.0.read_chunk(max, true).await?.map(|chunk| chunk.bytes)) + Ok(self + .inner + .read_chunk(max, true) + .await? + .map(|chunk| chunk.bytes)) } /// Send a `STOP_SENDING` QUIC code. - pub fn stop(mut self, code: u32) { - self.0.stop(code).ok(); + pub fn stop(&mut self, code: u32) { + self.inner.stop(code).ok(); } } diff --git a/web-transport/src/wasm.rs b/web-transport/src/wasm.rs index a98c28c..4b149c1 100644 --- a/web-transport/src/wasm.rs +++ b/web-transport/src/wasm.rs @@ -1,5 +1,8 @@ use bytes::{Buf, BufMut, Bytes}; +// Export the Wasm implementation to simplify Cargo.toml +pub use web_transport_wasm as wasm; + #[derive(Clone)] pub struct Session(web_transport_wasm::Session); @@ -33,12 +36,12 @@ impl Session { } /// Close the connection immediately - pub fn close(self, code: u32, reason: &str) { + pub fn close(&mut self, code: u32, reason: &str) { self.0.close(code, reason) } - pub async fn closed(&self) -> SessionError { - self.0.closed().await.into() + pub async fn closed(&self) -> Result { + self.0.closed().await } /// Send a datagram. @@ -51,6 +54,8 @@ impl Session { } } +pub type Closed = web_transport_wasm::Closed; + impl From for Session { fn from(session: web_transport_wasm::Session) -> Self { Session(session) @@ -80,7 +85,7 @@ impl SendStream { } /// Send a QUIC reset code. - pub fn reset(self, code: u32) { + pub fn reset(&mut self, code: u32) { self.0.reset(&code.to_string()) } } @@ -93,7 +98,7 @@ impl RecvStream { } /// Attempt to read from the stream into the given buffer. - pub async fn read_buf(&mut self, buf: &mut B) -> Result { + pub async fn read_buf(&mut self, buf: &mut B) -> Result, ReadError> { self.0.read_buf(buf).await.map_err(Into::into) } @@ -103,19 +108,11 @@ impl RecvStream { } /// Send a `STOP_SENDING` QUIC code. - pub fn stop(self, code: u32) { + pub fn stop(&mut self, code: u32) { self.0.stop(&code.to_string()) } } -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub struct SessionError(#[from] web_transport_wasm::WebError); - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub struct WriteError(#[from] web_transport_wasm::WebError); - -#[derive(Debug, thiserror::Error)] -#[error(transparent)] -pub struct ReadError(#[from] web_transport_wasm::WebError); +pub type SessionError = web_transport_wasm::SessionError; +pub type WriteError = web_transport_wasm::WriteError; +pub type ReadError = web_transport_wasm::ReadError;