From 12079caa69c7bb271e83449760a3e3752d4233f5 Mon Sep 17 00:00:00 2001 From: Dario A Lencina-Talarico Date: Fri, 26 Jul 2024 20:19:56 -0400 Subject: [PATCH] fix streams (#20) * fix streams * bump linux * add ready check * add better errors * remove close just to see if the connection stays functional * adding logs * forget callbacks * bump minor * fuck woke * fmt --- .github/workflows/ci.yaml | 2 +- Cargo.toml | 2 +- src/webtransport.rs | 210 +++++++++++++++++++------------------- 3 files changed, 108 insertions(+), 106 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index c782bc3..6511a15 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -13,7 +13,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - uses: dtolnay/rust-toolchain@1.70 + - uses: dtolnay/rust-toolchain@1.73 with: components: clippy, rustfmt - run: RUSTFLAGS=--cfg=web_sys_unstable_apis cargo clippy -- --deny warnings diff --git a/Cargo.toml b/Cargo.toml index f8b3ea6..4c29860 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yew-webtransport" -version = "0.21.0" +version = "0.21.1" edition = "2021" repository = "https://github.com/security-union/yew-webtransport.git" description = "WebTransport is an API offering low-latency, bidirectional, client-server messaging." diff --git a/src/webtransport.rs b/src/webtransport.rs index b2ebba5..d0d1115 100644 --- a/src/webtransport.rs +++ b/src/webtransport.rs @@ -93,8 +93,6 @@ pub struct WebTransportTask { notification: Callback, #[allow(dead_code)] listeners: [Promise; 2], - #[allow(dead_code)] - callbacks: [Closure; 2], } impl WebTransportTask { @@ -102,13 +100,11 @@ impl WebTransportTask { transport: Rc, notification: Callback, listeners: [Promise; 2], - callbacks: [Closure; 2], ) -> WebTransportTask { WebTransportTask { transport, notification, listeners, - callbacks, } } } @@ -133,8 +129,7 @@ impl WebTransportService { on_bidirectional_stream: Callback, notification: Callback, ) -> Result { - let ConnectCommon(transport, listeners, callbacks) = - Self::connect_common(url, ¬ification)?; + let ConnectCommon(transport, listeners) = Self::connect_common(url, ¬ification)?; let transport = Rc::new(transport); Self::start_listening_incoming_datagrams( @@ -154,12 +149,7 @@ impl WebTransportService { on_bidirectional_stream, ); - Ok(WebTransportTask::new( - transport, - notification, - listeners, - callbacks, - )) + Ok(WebTransportTask::new(transport, notification, listeners)) } fn start_listening_incoming_unidirectional_streams( @@ -302,15 +292,17 @@ impl WebTransportService { .closed() .then(&closed_closure) .catch(&closed_closure); + // forget closures, this is a minor leak but it prevents weird issues downstream + opened_closure.forget(); + closed_closure.forget(); { let listeners = [ready, closed]; - let callbacks = [opened_closure, closed_closure]; - Ok(ConnectCommon(transport, listeners, callbacks)) + Ok(ConnectCommon(transport, listeners)) } } } -struct ConnectCommon(WebTransport, [Promise; 2], [Closure; 2]); +struct ConnectCommon(WebTransport, [Promise; 2]); pub fn process_binary(bytes: &Uint8Array, callback: &Callback>) { let data = bytes.to_vec(); @@ -322,29 +314,31 @@ impl WebTransportTask { pub fn send_datagram(transport: Rc, data: Vec) { wasm_bindgen_futures::spawn_local(async move { let transport = transport.clone(); - let transport_2 = transport.clone(); - let result: Result<(), anyhow::Error> = async move { - let stream = transport.datagrams(); - let stream: WritableStream = stream.writable(); - if stream.locked() { - return Err(anyhow::anyhow!("Stream is locked")); + let result: Result<(), anyhow::Error> = { + let transport = transport.clone(); + async move { + let stream = transport.datagrams(); + let stream: WritableStream = stream.writable(); + if stream.locked() { + return Err(anyhow::anyhow!("Stream is locked")); + } + let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?; + let data = Uint8Array::from(data.as_slice()); + JsFuture::from(writer.ready()) + .await + .map_err(|e| anyhow!("{:?}", e))?; + JsFuture::from(writer.write_with_chunk(&data)) + .await + .map_err(|e| anyhow!("{:?}", e))?; + writer.release_lock(); + Ok(()) } - let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?; - let data = Uint8Array::from(data.as_slice()); - JsFuture::from(writer.ready()) - .await - .map_err(|e| anyhow!("{:?}", e))?; - JsFuture::from(writer.write_with_chunk(&data)) - .await - .map_err(|e| anyhow!("{:?}", e))?; - writer.release_lock(); - Ok(()) } .await; if let Err(e) = result { let e = e.to_string(); log!("error: ", e); - transport_2.close(); + transport.close(); } }); } @@ -352,30 +346,38 @@ impl WebTransportTask { pub fn send_unidirectional_stream(transport: Rc, data: Vec) { wasm_bindgen_futures::spawn_local(async move { let transport = transport.clone(); - let transport_2 = transport.clone(); - let result: Result<(), anyhow::Error> = async move { - let stream = JsFuture::from(transport.create_unidirectional_stream()).await; - let stream: WritableStream = - stream.map_err(|e| anyhow!("{:?}", e))?.unchecked_into(); - let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?; - let data = Uint8Array::from(data.as_slice()); - JsFuture::from(writer.ready()) - .await - .map_err(|e| anyhow!("{:?}", e))?; - let _ = JsFuture::from(writer.write_with_chunk(&data)) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e))?; - writer.release_lock(); - JsFuture::from(stream.close()) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e))?; - Ok(()) + let result: Result<(), anyhow::Error> = { + let transport = transport.clone(); + async move { + let _ = JsFuture::from(transport.ready()) + .await + .map_err(|e| anyhow!("{:?}", e))?; + let stream = JsFuture::from(transport.create_unidirectional_stream()).await; + let stream: WritableStream = stream + .map_err(|e| anyhow!("failed to create Writeable stream {:?}", e))? + .unchecked_into(); + let writer = stream + .get_writer() + .map_err(|e| anyhow!("Error getting writer {:?}", e))?; + let data = Uint8Array::from(data.as_slice()); + JsFuture::from(writer.ready()) + .await + .map_err(|e| anyhow!("Error getting writer ready {:?}", e))?; + let _ = JsFuture::from(writer.write_with_chunk(&data)) + .await + .map_err(|e| anyhow::anyhow!("Error writing to stream: {:?}", e))?; + writer.release_lock(); + JsFuture::from(stream.close()) + .await + .map_err(|e| anyhow::anyhow!("Error closing stream {:?}", e))?; + Ok(()) + } } .await; if let Err(e) = result { let e = e.to_string(); - log!("error: {}", e); - transport_2.close(); + log!("error: ", e); + transport.close(); } }); } @@ -387,68 +389,68 @@ impl WebTransportTask { ) { wasm_bindgen_futures::spawn_local(async move { let transport = transport.clone(); - let transport_2 = transport.clone(); - let result: Result<(), anyhow::Error> = async move { - let stream = JsFuture::from(transport.create_bidirectional_stream()).await; - let stream: WebTransportBidirectionalStream = - stream.map_err(|e| anyhow!("{:?}", e))?.unchecked_into(); - let readable: ReadableStreamDefaultReader = - stream.readable().get_reader().unchecked_into(); - let (sender, receiver) = channel(); - wasm_bindgen_futures::spawn_local(async move { - loop { - let read_result = JsFuture::from(readable.read()).await; - match read_result { - Err(e) => { - let mut reason = WebTransportCloseInfo::default(); - reason.reason( - format!("Failed to read incoming stream {e:?}").as_str(), - ); - transport.close_with_close_info(&reason); - break; - } - Ok(result) => { - let done = Reflect::get(&result, &JsString::from("done")) - .unwrap() - .unchecked_into::(); - if done.is_truthy() { + let result: Result<(), anyhow::Error> = { + let transport = transport.clone(); + async move { + let stream = JsFuture::from(transport.create_bidirectional_stream()).await; + let stream: WebTransportBidirectionalStream = + stream.map_err(|e| anyhow!("{:?}", e))?.unchecked_into(); + let readable: ReadableStreamDefaultReader = + stream.readable().get_reader().unchecked_into(); + let (sender, receiver) = channel(); + wasm_bindgen_futures::spawn_local(async move { + loop { + let read_result = JsFuture::from(readable.read()).await; + match read_result { + Err(e) => { + let mut reason = WebTransportCloseInfo::default(); + reason.reason( + format!("Failed to read incoming stream {e:?}").as_str(), + ); + transport.close_with_close_info(&reason); break; } - let value: Uint8Array = - Reflect::get(&result, &JsString::from("value")) + Ok(result) => { + let done = Reflect::get(&result, &JsString::from("done")) .unwrap() - .unchecked_into(); - process_binary(&value, &callback); + .unchecked_into::(); + if done.is_truthy() { + break; + } + let value: Uint8Array = + Reflect::get(&result, &JsString::from("value")) + .unwrap() + .unchecked_into(); + process_binary(&value, &callback); + } } } - } - sender.send(true).unwrap(); - }); - let writer = stream - .writable() - .get_writer() - .map_err(|e| anyhow!("{:?}", e))?; - - JsFuture::from(writer.ready()) - .await - .map_err(|e| anyhow!("{:?}", e))?; - let data = Uint8Array::from(data.as_slice()); - let _ = JsFuture::from(writer.write_with_chunk(&data)) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e))?; - JsFuture::from(writer.close()) - .await - .map_err(|e| anyhow::anyhow!("{:?}", e))?; - - let _ = receiver.await; - - Ok(()) + sender.send(true).unwrap(); + }); + let writer = stream + .writable() + .get_writer() + .map_err(|e| anyhow!("{:?}", e))?; + + JsFuture::from(writer.ready()) + .await + .map_err(|e| anyhow!("{:?}", e))?; + let data = Uint8Array::from(data.as_slice()); + let _ = JsFuture::from(writer.write_with_chunk(&data)) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e))?; + JsFuture::from(writer.close()) + .await + .map_err(|e| anyhow::anyhow!("{:?}", e))?; + let _ = receiver.await?; + Ok(()) + } } .await; if let Err(e) = result { let e = e.to_string(); log!("error: {}", e); - transport_2.close(); + transport.close(); } }); }