Skip to content

Commit

Permalink
fix streams (#20)
Browse files Browse the repository at this point in the history
* fix streams

* bump linux

* add ready check

* add better errors

* remove close just to see if the connection stays functional

* adding logs

* forget callbacks

* bump minor

* fuck woke

* fmt
  • Loading branch information
darioalessandro authored Jul 27, 2024
1 parent 26007ad commit 12079ca
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 106 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@1.70
- uses: dtolnay/rust-toolchain@1.73
with:
components: clippy, rustfmt
- run: RUSTFLAGS=--cfg=web_sys_unstable_apis cargo clippy -- --deny warnings
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yew-webtransport"
version = "0.21.0"
version = "0.21.1"
edition = "2021"
repository = "https://github.com/security-union/yew-webtransport.git"
description = "WebTransport is an API offering low-latency, bidirectional, client-server messaging."
Expand Down
210 changes: 106 additions & 104 deletions src/webtransport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,18 @@ pub struct WebTransportTask {
notification: Callback<WebTransportStatus>,
#[allow(dead_code)]
listeners: [Promise; 2],
#[allow(dead_code)]
callbacks: [Closure<dyn FnMut(JsValue)>; 2],
}

impl WebTransportTask {
fn new(
transport: Rc<WebTransport>,
notification: Callback<WebTransportStatus>,
listeners: [Promise; 2],
callbacks: [Closure<dyn FnMut(JsValue)>; 2],
) -> WebTransportTask {
WebTransportTask {
transport,
notification,
listeners,
callbacks,
}
}
}
Expand All @@ -133,8 +129,7 @@ impl WebTransportService {
on_bidirectional_stream: Callback<WebTransportBidirectionalStream>,
notification: Callback<WebTransportStatus>,
) -> Result<WebTransportTask, WebTransportError> {
let ConnectCommon(transport, listeners, callbacks) =
Self::connect_common(url, &notification)?;
let ConnectCommon(transport, listeners) = Self::connect_common(url, &notification)?;
let transport = Rc::new(transport);

Self::start_listening_incoming_datagrams(
Expand All @@ -154,12 +149,7 @@ impl WebTransportService {
on_bidirectional_stream,
);

Ok(WebTransportTask::new(
transport,
notification,
listeners,
callbacks,
))
Ok(WebTransportTask::new(transport, notification, listeners))
}

fn start_listening_incoming_unidirectional_streams(
Expand Down Expand Up @@ -302,15 +292,17 @@ impl WebTransportService {
.closed()
.then(&closed_closure)
.catch(&closed_closure);
// forget closures, this is a minor leak but it prevents weird issues downstream
opened_closure.forget();
closed_closure.forget();

{
let listeners = [ready, closed];
let callbacks = [opened_closure, closed_closure];
Ok(ConnectCommon(transport, listeners, callbacks))
Ok(ConnectCommon(transport, listeners))
}
}
}
struct ConnectCommon(WebTransport, [Promise; 2], [Closure<dyn FnMut(JsValue)>; 2]);
struct ConnectCommon(WebTransport, [Promise; 2]);

pub fn process_binary(bytes: &Uint8Array, callback: &Callback<Vec<u8>>) {
let data = bytes.to_vec();
Expand All @@ -322,60 +314,70 @@ impl WebTransportTask {
pub fn send_datagram(transport: Rc<WebTransport>, data: Vec<u8>) {
wasm_bindgen_futures::spawn_local(async move {
let transport = transport.clone();
let transport_2 = transport.clone();
let result: Result<(), anyhow::Error> = async move {
let stream = transport.datagrams();
let stream: WritableStream = stream.writable();
if stream.locked() {
return Err(anyhow::anyhow!("Stream is locked"));
let result: Result<(), anyhow::Error> = {
let transport = transport.clone();
async move {
let stream = transport.datagrams();
let stream: WritableStream = stream.writable();
if stream.locked() {
return Err(anyhow::anyhow!("Stream is locked"));
}
let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?;
let data = Uint8Array::from(data.as_slice());
JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow!("{:?}", e))?;
writer.release_lock();
Ok(())
}
let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?;
let data = Uint8Array::from(data.as_slice());
JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow!("{:?}", e))?;
writer.release_lock();
Ok(())
}
.await;
if let Err(e) = result {
let e = e.to_string();
log!("error: ", e);
transport_2.close();
transport.close();
}
});
}

pub fn send_unidirectional_stream(transport: Rc<WebTransport>, data: Vec<u8>) {
wasm_bindgen_futures::spawn_local(async move {
let transport = transport.clone();
let transport_2 = transport.clone();
let result: Result<(), anyhow::Error> = async move {
let stream = JsFuture::from(transport.create_unidirectional_stream()).await;
let stream: WritableStream =
stream.map_err(|e| anyhow!("{:?}", e))?.unchecked_into();
let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?;
let data = Uint8Array::from(data.as_slice());
JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
let _ = JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
writer.release_lock();
JsFuture::from(stream.close())
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
Ok(())
let result: Result<(), anyhow::Error> = {
let transport = transport.clone();
async move {
let _ = JsFuture::from(transport.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
let stream = JsFuture::from(transport.create_unidirectional_stream()).await;
let stream: WritableStream = stream
.map_err(|e| anyhow!("failed to create Writeable stream {:?}", e))?
.unchecked_into();
let writer = stream
.get_writer()
.map_err(|e| anyhow!("Error getting writer {:?}", e))?;
let data = Uint8Array::from(data.as_slice());
JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("Error getting writer ready {:?}", e))?;
let _ = JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow::anyhow!("Error writing to stream: {:?}", e))?;
writer.release_lock();
JsFuture::from(stream.close())
.await
.map_err(|e| anyhow::anyhow!("Error closing stream {:?}", e))?;
Ok(())
}
}
.await;
if let Err(e) = result {
let e = e.to_string();
log!("error: {}", e);
transport_2.close();
log!("error: ", e);
transport.close();
}
});
}
Expand All @@ -387,68 +389,68 @@ impl WebTransportTask {
) {
wasm_bindgen_futures::spawn_local(async move {
let transport = transport.clone();
let transport_2 = transport.clone();
let result: Result<(), anyhow::Error> = async move {
let stream = JsFuture::from(transport.create_bidirectional_stream()).await;
let stream: WebTransportBidirectionalStream =
stream.map_err(|e| anyhow!("{:?}", e))?.unchecked_into();
let readable: ReadableStreamDefaultReader =
stream.readable().get_reader().unchecked_into();
let (sender, receiver) = channel();
wasm_bindgen_futures::spawn_local(async move {
loop {
let read_result = JsFuture::from(readable.read()).await;
match read_result {
Err(e) => {
let mut reason = WebTransportCloseInfo::default();
reason.reason(
format!("Failed to read incoming stream {e:?}").as_str(),
);
transport.close_with_close_info(&reason);
break;
}
Ok(result) => {
let done = Reflect::get(&result, &JsString::from("done"))
.unwrap()
.unchecked_into::<Boolean>();
if done.is_truthy() {
let result: Result<(), anyhow::Error> = {
let transport = transport.clone();
async move {
let stream = JsFuture::from(transport.create_bidirectional_stream()).await;
let stream: WebTransportBidirectionalStream =
stream.map_err(|e| anyhow!("{:?}", e))?.unchecked_into();
let readable: ReadableStreamDefaultReader =
stream.readable().get_reader().unchecked_into();
let (sender, receiver) = channel();
wasm_bindgen_futures::spawn_local(async move {
loop {
let read_result = JsFuture::from(readable.read()).await;
match read_result {
Err(e) => {
let mut reason = WebTransportCloseInfo::default();
reason.reason(
format!("Failed to read incoming stream {e:?}").as_str(),
);
transport.close_with_close_info(&reason);
break;
}
let value: Uint8Array =
Reflect::get(&result, &JsString::from("value"))
Ok(result) => {
let done = Reflect::get(&result, &JsString::from("done"))
.unwrap()
.unchecked_into();
process_binary(&value, &callback);
.unchecked_into::<Boolean>();
if done.is_truthy() {
break;
}
let value: Uint8Array =
Reflect::get(&result, &JsString::from("value"))
.unwrap()
.unchecked_into();
process_binary(&value, &callback);
}
}
}
}
sender.send(true).unwrap();
});
let writer = stream
.writable()
.get_writer()
.map_err(|e| anyhow!("{:?}", e))?;

JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
let data = Uint8Array::from(data.as_slice());
let _ = JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
JsFuture::from(writer.close())
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;

let _ = receiver.await;

Ok(())
sender.send(true).unwrap();
});
let writer = stream
.writable()
.get_writer()
.map_err(|e| anyhow!("{:?}", e))?;

JsFuture::from(writer.ready())
.await
.map_err(|e| anyhow!("{:?}", e))?;
let data = Uint8Array::from(data.as_slice());
let _ = JsFuture::from(writer.write_with_chunk(&data))
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
JsFuture::from(writer.close())
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
let _ = receiver.await?;
Ok(())
}
}
.await;
if let Err(e) = result {
let e = e.to_string();
log!("error: {}", e);
transport_2.close();
transport.close();
}
});
}
Expand Down

0 comments on commit 12079ca

Please sign in to comment.