Skip to content

Commit

Permalink
WASM improvements (#30)
Browse files Browse the repository at this point in the history
* Don't take self in close functions.

It's a pain when Drop is used.

* Fix some WASM stuff.

* Some WASM improvements.

* WIP

* Some fixes to the read_buf API

* Oops advance_mut

* Remove some less useful changes.

* Not needed either.

* Clippy
  • Loading branch information
kixelated authored Aug 2, 2024
1 parent 463dd22 commit fdb5bd2
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 148 deletions.
6 changes: 5 additions & 1 deletion web-transport-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Luke Curley"]
repository = "https://github.com/kixelated/web-transport-rs"
license = "MIT"

version = "0.1.1"
version = "0.2.0"
edition = "2021"

keywords = ["quic", "http3", "webtransport"]
Expand All @@ -16,6 +16,8 @@ wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
js-sys = "0.3.69"
bytes = "1"
thiserror = "1"
url = "2"

[dependencies.web-sys]
version = "0.3.69"
Expand All @@ -24,11 +26,13 @@ features = [
"ReadableStreamDefaultReader",
"ReadableStreamReadResult",
"WebTransport",
"WebTransportOptions",
"WebTransportBidirectionalStream",
"WebTransportCloseInfo",
"WebTransportSendStream",
"WebTransportReceiveStream",
"WebTransportDatagramDuplexStream",
"WebTransportCongestionControl",
"WritableStream",
"WritableStreamDefaultWriter",
]
2 changes: 0 additions & 2 deletions web-transport-wasm/rust-toolchain.toml

This file was deleted.

69 changes: 48 additions & 21 deletions web-transport-wasm/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,61 @@
use std::{error, fmt};
use wasm_bindgen::prelude::*;

use wasm_bindgen::JsValue;
#[derive(Clone, Debug, thiserror::Error)]
#[error("web error: {0:?}")]
pub struct WebError(js_sys::Error);

#[derive(Debug)]
pub struct WebError {
value: JsValue,
impl From<js_sys::Error> for WebError {
fn from(e: js_sys::Error) -> Self {
Self(e)
}
}

impl From<JsValue> for WebError {
fn from(value: JsValue) -> Self {
Self { value }
impl From<wasm_bindgen::JsValue> for WebError {
fn from(e: wasm_bindgen::JsValue) -> Self {
Self(e.into())
}
}

impl error::Error for WebError {}
pub trait WebErrorExt<T> {
fn throw(self) -> Result<T, WebError>;
}

impl fmt::Display for WebError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Print out the JsValue as a string
match self.value.as_string() {
Some(s) => write!(f, "{}", s),
None => write!(f, "{:?}", self.value),
}
impl<T, E: Into<WebError>> WebErrorExt<T> for Result<T, E> {
fn throw(self) -> Result<T, WebError> {
self.map_err(Into::into)
}
}

impl From<&str> for WebError {
fn from(value: &str) -> Self {
Self {
value: value.into(),
}
#[derive(Clone, Debug, thiserror::Error)]
#[error("read error: {0:?}")]
pub struct ReadError(#[from] WebError);

#[derive(Clone, Debug, thiserror::Error)]
#[error("write error: {0:?}")]
pub struct WriteError(#[from] WebError);

#[derive(Clone, Debug, thiserror::Error)]
pub enum SessionError {
// TODO distinguish between different kinds of errors
#[error("read error: {0}")]
Read(#[from] ReadError),

#[error("write error: {0}")]
Write(#[from] WriteError),

#[error("web error: {0}")]
Web(#[from] WebError),
}

pub(crate) trait PromiseExt {
fn ignore(self);
}

impl PromiseExt for js_sys::Promise {
// Ignore the result of the promise by using an empty catch.
fn ignore(self) {
let closure = Closure::wrap(Box::new(|_: JsValue| {}) as Box<dyn FnMut(JsValue)>);
let _ = self.catch(&closure);
closure.forget();
}
}
24 changes: 14 additions & 10 deletions web-transport-wasm/src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,45 @@
use js_sys::Reflect;
use wasm_bindgen::{JsCast, JsValue};
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use web_sys::{ReadableStream, ReadableStreamDefaultReader, ReadableStreamReadResult};

use crate::WebError;
use crate::{PromiseExt, ReadError, WebErrorExt};

// Wrapper around ReadableStream
pub struct Reader {
inner: ReadableStreamDefaultReader,
}

impl Reader {
pub fn new(stream: &ReadableStream) -> Result<Self, WebError> {
pub fn new(stream: &ReadableStream) -> Result<Self, ReadError> {
let inner = stream.get_reader().unchecked_into();
Ok(Self { inner })
}

pub async fn read<T: JsCast>(&mut self) -> Result<Option<T>, WebError> {
let result: ReadableStreamReadResult = JsFuture::from(self.inner.read()).await?.into();
pub async fn read<T: JsCast>(&mut self) -> Result<Option<T>, ReadError> {
let result: ReadableStreamReadResult =
JsFuture::from(self.inner.read()).await.throw()?.into();

if Reflect::get(&result, &"done".into())?.is_truthy() {
if Reflect::get(&result, &"done".into()).throw()?.is_truthy() {
return Ok(None);
}

let res = Reflect::get(&result, &"value".into())?.dyn_into()?;
let res = Reflect::get(&result, &"value".into())
.throw()?
.unchecked_into();

Ok(Some(res))
}

pub fn close(self, reason: &str) {
pub fn close(&mut self, reason: &str) {
let str = JsValue::from_str(reason);
let _ = self.inner.cancel_with_reason(&str); // ignore the promise
self.inner.cancel_with_reason(&str).ignore();
}
}

impl Drop for Reader {
fn drop(&mut self) {
let _ = self.inner.cancel(); // ignore the promise
self.inner.cancel().ignore();
self.inner.release_lock();
}
}
45 changes: 23 additions & 22 deletions web-transport-wasm/src/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@ use bytes::{BufMut, Bytes, BytesMut};
use js_sys::Uint8Array;
use web_sys::WebTransportReceiveStream;

use crate::{Reader, WebError};
use crate::{ReadError, Reader};

pub struct RecvStream {
reader: Reader,
buffer: BytesMut,
}

impl RecvStream {
pub fn new(stream: WebTransportReceiveStream) -> Result<Self, WebError> {
if stream.locked() {
return Err("locked".into());
}

pub(super) fn new(stream: WebTransportReceiveStream) -> Result<Self, ReadError> {
let reader = Reader::new(&stream)?;

Ok(Self {
Expand All @@ -25,25 +21,30 @@ impl RecvStream {
})
}

pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, WebError> {
Ok(self.read_chunk(buf.len()).await?.map(|chunk| {
let size = chunk.len();
buf[..size].copy_from_slice(&chunk);
size
}))
pub async fn read(&mut self, buf: &mut [u8]) -> Result<Option<usize>, ReadError> {
let chunk = match self.read_chunk(buf.len()).await? {
Some(chunk) => chunk,
None => return Ok(None),
};

let size = chunk.len();
buf[..size].copy_from_slice(&chunk);
Ok(Some(size))
}

pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<bool, WebError> {
Ok(match self.read_chunk(buf.remaining_mut()).await? {
Some(chunk) => {
buf.put(chunk);
true
}
None => false,
})
pub async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, ReadError> {
let chunk = match self.read_chunk(buf.remaining_mut()).await? {
Some(chunk) => chunk,
None => return Ok(None),
};

let size = chunk.len();
buf.put(chunk);

Ok(Some(size))
}

pub async fn read_chunk(&mut self, max: usize) -> Result<Option<Bytes>, WebError> {
pub async fn read_chunk(&mut self, max: usize) -> Result<Option<Bytes>, ReadError> {
if !self.buffer.is_empty() {
let size = cmp::min(max, self.buffer.len());
let data = self.buffer.split_to(size).freeze();
Expand All @@ -64,7 +65,7 @@ impl RecvStream {
Ok(Some(data))
}

pub fn stop(self, reason: &str) {
pub fn stop(&mut self, reason: &str) {
self.reader.close(reason);
}
}
19 changes: 10 additions & 9 deletions web-transport-wasm/src/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,36 @@ use bytes::{Buf, Bytes};
use js_sys::{Reflect, Uint8Array};
use web_sys::WebTransportSendStream;

use crate::{WebError, Writer};
use crate::{WriteError, Writer};

pub struct SendStream {
stream: WebTransportSendStream,
writer: Writer,
}

impl SendStream {
pub fn new(stream: WebTransportSendStream) -> Result<Self, WebError> {
pub(super) fn new(stream: WebTransportSendStream) -> Result<Self, WriteError> {
let writer = Writer::new(&stream)?;
Ok(Self { stream, writer })
}

pub async fn write(&mut self, buf: &[u8]) -> Result<usize, WebError> {
pub async fn write(&mut self, buf: &[u8]) -> Result<usize, WriteError> {
self.writer.write(&Uint8Array::from(buf)).await?;
Ok(buf.len())
}

pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, WebError> {
let chunk = buf.chunk();
self.writer.write(&Uint8Array::from(chunk)).await?;
Ok(chunk.len())
pub async fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Result<usize, WriteError> {
let size = self.write(buf.chunk()).await?;
buf.advance(size);

Ok(size)
}

pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), WebError> {
pub async fn write_chunk(&mut self, buf: Bytes) -> Result<(), WriteError> {
self.write(&buf).await.map(|_| ())
}

pub fn reset(self, reason: &str) {
pub fn reset(&mut self, reason: &str) {
self.writer.close(reason);
}

Expand Down
Loading

0 comments on commit fdb5bd2

Please sign in to comment.