Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix streams #20

Merged
merged 10 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@1.70
- uses: dtolnay/rust-toolchain@1.73
with:
components: clippy, rustfmt
- run: RUSTFLAGS=--cfg=web_sys_unstable_apis cargo clippy -- --deny warnings
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yew-webtransport"
version = "0.21.0"
version = "0.21.1"
edition = "2021"
repository = "https://github.com/security-union/yew-webtransport.git"
description = "WebTransport is an API offering low-latency, bidirectional, client-server messaging."
Expand Down
210 changes: 106 additions & 104 deletions src/webtransport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,18 @@ pub struct WebTransportTask {
notification: Callback<WebTransportStatus>,
#[allow(dead_code)]
listeners: [Promise; 2],
#[allow(dead_code)]
callbacks: [Closure<dyn FnMut(JsValue)>; 2],
}

impl WebTransportTask {
fn new(
transport: Rc<WebTransport>,
notification: Callback<WebTransportStatus>,
listeners: [Promise; 2],
callbacks: [Closure<dyn FnMut(JsValue)>; 2],
) -> WebTransportTask {
WebTransportTask {
transport,
notification,
listeners,
callbacks,
}
}
}
Expand All @@ -133,8 +129,7 @@ impl WebTransportService {
on_bidirectional_stream: Callback<WebTransportBidirectionalStream>,
notification: Callback<WebTransportStatus>,
) -> Result<WebTransportTask, WebTransportError> {
let ConnectCommon(transport, listeners, callbacks) =
Self::connect_common(url, &notification)?;
let ConnectCommon(transport, listeners) = Self::connect_common(url, &notification)?;
let transport = Rc::new(transport);

Self::start_listening_incoming_datagrams(
Expand All @@ -154,12 +149,7 @@ impl WebTransportService {
on_bidirectional_stream,
);

Ok(WebTransportTask::new(
transport,
notification,
listeners,
callbacks,
))
Ok(WebTransportTask::new(transport, notification, listeners))
}

fn start_listening_incoming_unidirectional_streams(
Expand Down Expand Up @@ -302,15 +292,17 @@ impl WebTransportService {
.closed()
.then(&closed_closure)
.catch(&closed_closure);
// forget closures, this is a minor leak but it prevents weird issues downstream
opened_closure.forget();
closed_closure.forget();

{
let listeners = [ready, closed];
let callbacks = [opened_closure, closed_closure];
Ok(ConnectCommon(transport, listeners, callbacks))
Ok(ConnectCommon(transport, listeners))
}
}
}
struct ConnectCommon(WebTransport, [Promise; 2], [Closure<dyn FnMut(JsValue)>; 2]);
struct ConnectCommon(WebTransport, [Promise; 2]);

pub fn process_binary(bytes: &Uint8Array, callback: &Callback<Vec<u8>>) {
let data = bytes.to_vec();
Expand All @@ -322,60 +314,70 @@ impl WebTransportTask {
pub fn send_datagram(transport: Rc<WebTransport>, data: Vec<u8>) {
wasm_bindgen_futures::spawn_local(async move {
let transport = transport.clone();
let transport_2 = transport.clone();
let result: Result<(), anyhow::Error> = async move {
let stream = transport.datagrams();
let stream: WritableStream = stream.writable();
if stream.locked() {
return Err(anyhow::anyhow!("Stream is locked"));
let result: Result<(), anyhow::Error> = {
let transport = transport.clone();
async move {
let stream = transport.datagrams();
let stream: WritableStream = stream.writable();
if stream.locked() {
return Err(anyhow::anyhow!("Stream is locked"));
}
let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?;
let data = Uint8Array::from(data.as_slice());
JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow!("{:?}", e))?;
writer.release_lock();
Ok(())
}
let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?;
let data = Uint8Array::from(data.as_slice());
JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow!("{:?}", e))?;
writer.release_lock();
Ok(())
}
.await;
if let Err(e) = result {
let e = e.to_string();
log!("error: ", e);
transport_2.close();
transport.close();
}
});
}

pub fn send_unidirectional_stream(transport: Rc<WebTransport>, data: Vec<u8>) {
wasm_bindgen_futures::spawn_local(async move {
let transport = transport.clone();
let transport_2 = transport.clone();
let result: Result<(), anyhow::Error> = async move {
let stream = JsFuture::from(transport.create_unidirectional_stream()).await;
let stream: WritableStream =
stream.map_err(|e| anyhow!("{:?}", e))?.unchecked_into();
let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?;
let data = Uint8Array::from(data.as_slice());
JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
let _ = JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
writer.release_lock();
JsFuture::from(stream.close())
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
Ok(())
let result: Result<(), anyhow::Error> = {
let transport = transport.clone();
async move {
let _ = JsFuture::from(transport.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
let stream = JsFuture::from(transport.create_unidirectional_stream()).await;
let stream: WritableStream = stream
.map_err(|e| anyhow!("failed to create Writeable stream {:?}", e))?
.unchecked_into();
let writer = stream
.get_writer()
.map_err(|e| anyhow!("Error getting writer {:?}", e))?;
let data = Uint8Array::from(data.as_slice());
JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("Error getting writer ready {:?}", e))?;
let _ = JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow::anyhow!("Error writing to stream: {:?}", e))?;
writer.release_lock();
JsFuture::from(stream.close())
.await
.map_err(|e| anyhow::anyhow!("Error closing stream {:?}", e))?;
Ok(())
}
}
.await;
if let Err(e) = result {
let e = e.to_string();
log!("error: {}", e);
transport_2.close();
log!("error: ", e);
transport.close();
}
});
}
Expand All @@ -387,68 +389,68 @@ impl WebTransportTask {
) {
wasm_bindgen_futures::spawn_local(async move {
let transport = transport.clone();
let transport_2 = transport.clone();
let result: Result<(), anyhow::Error> = async move {
let stream = JsFuture::from(transport.create_bidirectional_stream()).await;
let stream: WebTransportBidirectionalStream =
stream.map_err(|e| anyhow!("{:?}", e))?.unchecked_into();
let readable: ReadableStreamDefaultReader =
stream.readable().get_reader().unchecked_into();
let (sender, receiver) = channel();
wasm_bindgen_futures::spawn_local(async move {
loop {
let read_result = JsFuture::from(readable.read()).await;
match read_result {
Err(e) => {
let mut reason = WebTransportCloseInfo::default();
reason.reason(
format!("Failed to read incoming stream {e:?}").as_str(),
);
transport.close_with_close_info(&reason);
break;
}
Ok(result) => {
let done = Reflect::get(&result, &JsString::from("done"))
.unwrap()
.unchecked_into::<Boolean>();
if done.is_truthy() {
let result: Result<(), anyhow::Error> = {
let transport = transport.clone();
async move {
let stream = JsFuture::from(transport.create_bidirectional_stream()).await;
let stream: WebTransportBidirectionalStream =
stream.map_err(|e| anyhow!("{:?}", e))?.unchecked_into();
let readable: ReadableStreamDefaultReader =
stream.readable().get_reader().unchecked_into();
let (sender, receiver) = channel();
wasm_bindgen_futures::spawn_local(async move {
loop {
let read_result = JsFuture::from(readable.read()).await;
match read_result {
Err(e) => {
let mut reason = WebTransportCloseInfo::default();
reason.reason(
format!("Failed to read incoming stream {e:?}").as_str(),
);
transport.close_with_close_info(&reason);
break;
}
let value: Uint8Array =
Reflect::get(&result, &JsString::from("value"))
Ok(result) => {
let done = Reflect::get(&result, &JsString::from("done"))
.unwrap()
.unchecked_into();
process_binary(&value, &callback);
.unchecked_into::<Boolean>();
if done.is_truthy() {
break;
}
let value: Uint8Array =
Reflect::get(&result, &JsString::from("value"))
.unwrap()
.unchecked_into();
process_binary(&value, &callback);
}
}
}
}
sender.send(true).unwrap();
});
let writer = stream
.writable()
.get_writer()
.map_err(|e| anyhow!("{:?}", e))?;

JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
let data = Uint8Array::from(data.as_slice());
let _ = JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
JsFuture::from(writer.close())
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;

let _ = receiver.await;

Ok(())
sender.send(true).unwrap();
});
let writer = stream
.writable()
.get_writer()
.map_err(|e| anyhow!("{:?}", e))?;

JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
let data = Uint8Array::from(data.as_slice());
let _ = JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
JsFuture::from(writer.close())
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
let _ = receiver.await?;
Ok(())
}
}
.await;
if let Err(e) = result {
let e = e.to_string();
log!("error: {}", e);
transport_2.close();
transport.close();
}
});
}
Expand Down
Loading