From 26007adef14838fe8207ea505da60fe1ca909d30 Mon Sep 17 00:00:00 2001 From: Griffin Obeid Date: Sat, 13 Jul 2024 11:37:38 -0400 Subject: [PATCH] Lock writable stream before getting writer (#17) * Lock writable stream before getting writer * Make sure stream is ready to write * cargo fmt --- src/webtransport.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/webtransport.rs b/src/webtransport.rs index cc6afc7..b2ebba5 100644 --- a/src/webtransport.rs +++ b/src/webtransport.rs @@ -326,18 +326,24 @@ impl WebTransportTask { let result: Result<(), anyhow::Error> = async move { let stream = transport.datagrams(); let stream: WritableStream = stream.writable(); + if stream.locked() { + return Err(anyhow::anyhow!("Stream is locked")); + } let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?; let data = Uint8Array::from(data.as_slice()); - let _stream = JsFuture::from(writer.write_with_chunk(&data)) + JsFuture::from(writer.ready()) .await - .map_err(|e| anyhow::anyhow!("{:?}", e))?; + .map_err(|e| anyhow!("{:?}", e))?; + JsFuture::from(writer.write_with_chunk(&data)) + .await + .map_err(|e| anyhow!("{:?}", e))?; writer.release_lock(); Ok(()) } .await; if let Err(e) = result { let e = e.to_string(); - log!("error: {}", e); + log!("error: ", e); transport_2.close(); } }); @@ -353,6 +359,9 @@ impl WebTransportTask { stream.map_err(|e| anyhow!("{:?}", e))?.unchecked_into(); let writer = stream.get_writer().map_err(|e| anyhow!("{:?}", e))?; let data = Uint8Array::from(data.as_slice()); + JsFuture::from(writer.ready()) + .await + .map_err(|e| anyhow!("{:?}", e))?; let _ = JsFuture::from(writer.write_with_chunk(&data)) .await .map_err(|e| anyhow::anyhow!("{:?}", e))?; @@ -420,6 +429,9 @@ impl WebTransportTask { .get_writer() .map_err(|e| anyhow!("{:?}", e))?; + JsFuture::from(writer.ready()) + .await + .map_err(|e| anyhow!("{:?}", e))?; let data = Uint8Array::from(data.as_slice()); let _ = JsFuture::from(writer.write_with_chunk(&data)) .await