From 649f4cd26a74740ca7d8ebaa2a5df17e36c3246f Mon Sep 17 00:00:00 2001 From: piegames Date: Sat, 21 Jan 2023 11:25:17 +0100 Subject: [PATCH 1/8] Transit: decouple IO from TcpStream --- changelog.md | 2 + src/forwarding.rs | 12 +-- src/transfer.rs | 12 +-- src/transfer/v1.rs | 12 +-- src/transit.rs | 232 +++++++++++++++++++++++++----------------- src/transit/crypto.rs | 54 ++++++---- 6 files changed, 193 insertions(+), 131 deletions(-) diff --git a/changelog.md b/changelog.md index 14a983e6..45053253 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,8 @@ ## Unreleased +- \[lib\]\[breaking\] replaced `transit::TransitInfo` with a struct containing the address, the old enum has been renamed to `transit::ConnectionType`. + ## Version 0.6.0 - Add shell completion support for the CLI diff --git a/src/forwarding.rs b/src/forwarding.rs index 55dc45cf..03dc3a8e 100644 --- a/src/forwarding.rs +++ b/src/forwarding.rs @@ -135,7 +135,7 @@ impl ForwardingError { /// as the value. pub async fn serve( mut wormhole: Wormhole, - transit_handler: impl FnOnce(transit::TransitInfo, std::net::SocketAddr), + transit_handler: impl FnOnce(transit::TransitInfo), relay_hints: Vec, targets: Vec<(Option, u16)>, cancel: impl Future, @@ -190,7 +190,7 @@ pub async fn serve( }, }; - let (mut transit, info, addr) = match connector + let (mut transit, info) = match connector .leader_connect( wormhole.key().derive_transit_key(wormhole.appid()), peer_version.transit_abilities, @@ -207,7 +207,7 @@ pub async fn serve( return Err(error); }, }; - transit_handler(info, addr); + transit_handler(info); /* We got a transit, now close the Wormhole */ wormhole.close().await?; @@ -518,7 +518,7 @@ impl ForwardingServe { /// no more than 1024 ports may be forwarded at once. pub async fn connect( mut wormhole: Wormhole, - transit_handler: impl FnOnce(transit::TransitInfo, std::net::SocketAddr), + transit_handler: impl FnOnce(transit::TransitInfo), relay_hints: Vec, bind_address: Option, custom_ports: &[u16], @@ -561,7 +561,7 @@ pub async fn connect( }, }; - let (mut transit, info, addr) = match connector + let (mut transit, info) = match connector .follower_connect( wormhole.key().derive_transit_key(wormhole.appid()), peer_version.transit_abilities, @@ -578,7 +578,7 @@ pub async fn connect( return Err(error); }, }; - transit_handler(info, addr); + transit_handler(info); /* We got a transit, now close the Wormhole */ wormhole.close().await?; diff --git a/src/transfer.rs b/src/transfer.rs index 08e620ff..36f57437 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -214,7 +214,7 @@ pub async fn send_file_or_folder( where N: AsRef, M: AsRef, - G: FnOnce(transit::TransitInfo, std::net::SocketAddr), + G: FnOnce(transit::TransitInfo), H: FnMut(u64, u64) + 'static, { use async_std::fs::File; @@ -271,7 +271,7 @@ pub async fn send_file( where F: AsyncRead + Unpin, N: Into, - G: FnOnce(transit::TransitInfo, std::net::SocketAddr), + G: FnOnce(transit::TransitInfo), H: FnMut(u64, u64) + 'static, { let _peer_version: AppVersion = serde_json::from_value(wormhole.peer_version.clone())?; @@ -312,7 +312,7 @@ pub async fn send_folder( where N: Into, M: Into, - G: FnOnce(transit::TransitInfo, std::net::SocketAddr), + G: FnOnce(transit::TransitInfo), H: FnMut(u64, u64) + 'static, { v1::send_folder( @@ -453,7 +453,7 @@ impl ReceiveRequest { ) -> Result<(), TransferError> where F: FnMut(u64, u64) + 'static, - G: FnOnce(transit::TransitInfo, std::net::SocketAddr), + G: FnOnce(transit::TransitInfo), W: AsyncWrite + Unpin, { let run = Box::pin(async { @@ -463,7 +463,7 @@ impl ReceiveRequest { .send_json(&PeerMessage::file_ack("ok")) .await?; - let (mut transit, info, addr) = self + let (mut transit, info) = self .connector .follower_connect( self.wormhole @@ -473,7 +473,7 @@ impl ReceiveRequest { self.their_hints.clone(), ) .await?; - transit_handler(info, addr); + transit_handler(info); debug!("Beginning file transfer"); v1::tcp_file_receive( diff --git a/src/transfer/v1.rs b/src/transfer/v1.rs index 1049c404..be758490 100644 --- a/src/transfer/v1.rs +++ b/src/transfer/v1.rs @@ -19,7 +19,7 @@ pub async fn send_file( where F: AsyncRead + Unpin, N: Into, - G: FnOnce(transit::TransitInfo, std::net::SocketAddr), + G: FnOnce(transit::TransitInfo), H: FnMut(u64, u64) + 'static, { let run = Box::pin(async { @@ -76,14 +76,14 @@ where } } - let (mut transit, info, addr) = connector + let (mut transit, info) = connector .leader_connect( wormhole.key().derive_transit_key(wormhole.appid()), their_abilities, Arc::new(their_hints), ) .await?; - transit_handler(info, addr); + transit_handler(info); debug!("Beginning file transfer"); @@ -121,7 +121,7 @@ pub async fn send_folder( where N: Into, M: Into, - G: FnOnce(transit::TransitInfo, std::net::SocketAddr), + G: FnOnce(transit::TransitInfo), H: FnMut(u64, u64) + 'static, { let run = Box::pin(async { @@ -224,14 +224,14 @@ where }, } - let (mut transit, info, addr) = connector + let (mut transit, info) = connector .leader_connect( wormhole.key().derive_transit_key(wormhole.appid()), their_abilities, Arc::new(their_hints), ) .await?; - transit_handler(info, addr); + transit_handler(info); debug!("Beginning file transfer"); diff --git a/src/transit.rs b/src/transit.rs index 76ac8b8b..5d1838a8 100644 --- a/src/transit.rs +++ b/src/transit.rs @@ -20,6 +20,7 @@ use async_std::{ io::{prelude::WriteExt, ReadExt}, net::{TcpListener, TcpStream}, }; +use futures::io::{AsyncRead, AsyncWrite}; #[allow(unused_imports)] /* We need them for the docs */ use futures::{future::TryFutureExt, Sink, SinkExt, Stream, StreamExt, TryStreamExt}; use log::*; @@ -554,14 +555,30 @@ impl TryFrom<&DirectHint> for SocketAddr { } } +/// Direct or relay #[derive(Clone, Debug, Eq, PartialEq)] #[non_exhaustive] -pub enum TransitInfo { +pub enum ConnectionType { + /// We are directly connected to our peer Direct, + /// We are connected to a relay server, and may even know its name Relay { name: Option }, } -type TransitConnection = (TcpStream, TransitInfo); +/// Metadata for the established transit connection +#[derive(Clone, Debug, Eq, PartialEq)] +#[non_exhaustive] +pub struct TransitInfo { + /// Whether we are connected directly or via a relay server + pub conn_type: ConnectionType, + /// Target address of our connection. This may be our peer, or the relay server. + /// This says nothing about the actual transport protocol used. + pub peer_addr: SocketAddr, + // Prevent exhaustive destructuring for future proofing + _unused: (), +} + +type TransitConnection = (Box, TransitInfo); fn set_socket_opts(socket: &socket2::Socket) -> std::io::Result<()> { socket.set_nonblocking(true)?; @@ -741,31 +758,37 @@ async fn get_external_ip() -> Result<(SocketAddr, TcpStream), StunError> { /// ```no_run /// use magic_wormhole as mw; /// # #[async_std::main] async fn main() -> Result<(), mw::transit::TransitConnectError> { -/// # let derived_key = todo!(); -/// # let their_abilities = todo!(); -/// # let their_hints = todo!(); -/// let connector: mw::transit::TransitConnector = todo!("transit::init(…).await?"); -/// let (mut transit, info, addr) = connector +/// # let derived_key = unimplemented!(); +/// # let their_abilities = unimplemented!(); +/// # let their_hints = unimplemented!(); +/// let connector: mw::transit::TransitConnector = unimplemented!("transit::init(…).await?"); +/// let (mut transit, info) = connector /// .leader_connect(derived_key, their_abilities, their_hints) /// .await?; -/// mw::transit::log_transit_connection(info, addr); +/// mw::transit::log_transit_connection(info); /// # Ok(()) /// # } /// ``` -pub fn log_transit_connection(info: TransitInfo, peer_addr: SocketAddr) { - match info { - TransitInfo::Direct => { - log::info!("Established direct transit connection to '{}'", peer_addr,); +pub fn log_transit_connection(info: TransitInfo) { + match info.conn_type { + ConnectionType::Direct => { + log::info!( + "Established direct transit connection to '{}'", + info.peer_addr, + ); }, - TransitInfo::Relay { name: Some(name) } => { + ConnectionType::Relay { name: Some(name) } => { log::info!( "Established transit connection via relay '{}' ({})", name, - peer_addr, + info.peer_addr, ); }, - TransitInfo::Relay { name: None } => { - log::info!("Established transit connection via relay ({})", peer_addr,); + ConnectionType::Relay { name: None } => { + log::info!( + "Established transit connection via relay ({})", + info.peer_addr, + ); }, } } @@ -939,7 +962,7 @@ impl TransitConnector { transit_key: Key, their_abilities: Abilities, their_hints: Arc, - ) -> Result<(Transit, TransitInfo, SocketAddr), TransitConnectError> { + ) -> Result<(Transit, TransitInfo), TransitConnectError> { let Self { sockets, our_abilities, @@ -969,7 +992,7 @@ impl TransitConnector { }), ); - let (mut transit, mut host_type) = async_std::future::timeout( + let (mut transit, mut finalizer, mut conn_info) = async_std::future::timeout( std::time::Duration::from_secs(60), connection_stream.next(), ) @@ -980,7 +1003,7 @@ impl TransitConnector { })? .ok_or(TransitConnectError::Handshake)?; - if host_type != TransitInfo::Direct && our_abilities.can_direct() { + if conn_info.conn_type != ConnectionType::Direct && our_abilities.can_direct() { log::debug!( "Established transit connection over relay. Trying to find a direct connection …" ); @@ -995,11 +1018,14 @@ impl TransitConnector { elapsed.mul_f32(0.3) }; let _ = async_std::future::timeout(to_wait, async { - while let Some((new_transit, new_host_type)) = connection_stream.next().await { + while let Some((new_transit, new_finalizer, new_conn_info)) = + connection_stream.next().await + { /* We already got a connection, so we're only interested in direct ones */ - if new_host_type == TransitInfo::Direct { + if new_conn_info.conn_type == ConnectionType::Direct { transit = new_transit; - host_type = new_host_type; + finalizer = new_finalizer; + conn_info = new_conn_info; log::debug!("Found direct connection; using that instead."); break; } @@ -1016,17 +1042,23 @@ impl TransitConnector { */ std::mem::drop(connection_stream); - let (mut socket, finalizer) = transit; let (tx, rx) = finalizer - .handshake_finalize(&mut socket) + .handshake_finalize(&mut transit) .await .map_err(|e| { log::debug!("`handshake_finalize` failed: {e}"); TransitConnectError::Handshake })?; - let addr = socket.peer_addr().unwrap(); - Ok((Transit { socket, tx, rx }, host_type, addr)) + // let socket = Box::new(socket) as Box; + Ok(( + Transit { + socket: transit, + tx, + rx, + }, + conn_info, + )) } /** @@ -1037,7 +1069,7 @@ impl TransitConnector { transit_key: Key, their_abilities: Abilities, their_hints: Arc, - ) -> Result<(Transit, TransitInfo, SocketAddr), TransitConnectError> { + ) -> Result<(Transit, TransitInfo), TransitConnectError> { let Self { sockets, our_abilities, @@ -1072,8 +1104,7 @@ impl TransitConnector { ) .await { - Ok(Some(((mut socket, finalizer), host_type))) => { - let addr = socket.peer_addr().unwrap(); + Ok(Some((mut socket, finalizer, conn_info))) => { let (tx, rx) = finalizer .handshake_finalize(&mut socket) .await @@ -1081,8 +1112,9 @@ impl TransitConnector { log::debug!("`handshake_finalize` failed: {e}"); TransitConnectError::Handshake })?; + // let socket = Box::new(socket) as Box; - Ok((Transit { socket, tx, rx }, host_type, addr)) + Ok((Transit { socket, tx, rx }, conn_info)) }, Ok(None) | Err(_) => { log::debug!("`follower_connect` timed out"); @@ -1115,8 +1147,31 @@ impl TransitConnector { their_abilities: Abilities, their_hints: Arc, socket: Option<(MaybeConnectedSocket, TcpListener)>, - ) -> impl Stream> - + 'static { + ) -> impl Stream> + 'static { + /* Take a tcp connection and transform it into a `TransitConnection` (mainly set timeouts) */ + fn wrap_tcp_connection( + socket: TcpStream, + conn_type: ConnectionType, + ) -> Result { + /* Set proper read and write timeouts. This will temporarily set the socket into blocking mode :/ */ + // https://github.com/async-rs/async-std/issues/499 + let socket = std::net::TcpStream::try_from(socket) + .expect("Internal error: this should not fail because we never cloned the socket"); + socket.set_write_timeout(Some(std::time::Duration::from_secs(120)))?; + socket.set_read_timeout(Some(std::time::Duration::from_secs(120)))?; + let socket: TcpStream = socket.into(); + + let info = TransitInfo { + conn_type, + peer_addr: socket + .peer_addr() + .expect("Internal error: socket must be IP"), + _unused: (), + }; + + Ok((Box::new(socket), info)) + } + /* Have socket => can direct */ assert!(socket.is_none() || our_abilities.can_direct()); @@ -1165,7 +1220,8 @@ impl TransitConnector { log::debug!("Connecting directly to {}", dest_addr); let socket = connect_custom(&local_addr, &dest_addr.into()).await?; log::debug!("Connected to {}!", dest_addr); - Ok((socket, TransitInfo::Direct)) + + wrap_tcp_connection(socket, ConnectionType::Direct) } }) .map(|fut| Box::pin(fut) as ConnectorFuture), @@ -1187,7 +1243,8 @@ impl TransitConnector { log::debug!("Connecting directly to {}", dest_addr); let socket = async_std::net::TcpStream::connect(&dest_addr).await?; log::debug!("Connected to {}!", dest_addr); - Ok((socket, TransitInfo::Direct)) + + wrap_tcp_connection(socket, ConnectionType::Direct) }) .map(|fut| Box::pin(fut) as ConnectorFuture), ), @@ -1197,7 +1254,7 @@ impl TransitConnector { None }; - /* Relay hints. Make sure that both sides adverize it, since it is fine to support it without providing own hints. */ + /* Relay hints. Make sure that both sides advertize it, since it is fine to support it without providing own hints. */ if our_abilities.can_relay() && their_abilities.can_relay() { /* Collect intermediate into HashSet for deduplication */ let mut relay_hints = Vec::::new(); @@ -1206,20 +1263,6 @@ impl TransitConnector { hint.merge_into(&mut relay_hints); } - /* Take a relay hint and try to connect to it */ - async fn hint_connector( - host: DirectHint, - name: Option, - ) -> Result { - log::debug!("Connecting to relay {}", host); - let transit = TcpStream::connect((host.hostname.as_str(), host.port)) - .err_into::() - .await?; - log::debug!("Connected to {}!", host); - - Ok((transit, TransitInfo::Relay { name })) - } - connectors = Box::new( connectors.chain( relay_hints @@ -1255,7 +1298,13 @@ impl TransitConnector { index as u64 * 5, )) .await; - hint_connector(host, name).await + log::debug!("Connecting to relay {}", host); + let socket = TcpStream::connect((host.hostname.as_str(), host.port)) + .err_into::() + .await?; + log::debug!("Connected to {}!", host); + + wrap_tcp_connection(socket, ConnectionType::Relay { name }) }) .map(|fut| Box::pin(fut) as ConnectorFuture), ), @@ -1273,29 +1322,25 @@ impl TransitConnector { let tside = tside2.clone(); let cryptor = cryptor2.clone(); async move { - let (socket, host_type) = fut.await?; - let transit = handshake_exchange( + let (socket, conn_info) = fut.await?; + let (transit, finalizer) = handshake_exchange( is_leader, tside, socket, - &host_type, + &conn_info.conn_type, &*cryptor, transit_key, ) .await?; - Ok((transit, host_type)) + Ok((transit, finalizer, conn_info)) } }) .map(|fut| { Box::pin(fut) - as BoxFuture< - Result<(HandshakeResult, TransitInfo), crypto::TransitHandshakeError>, - > + as BoxFuture> }), ) - as BoxIterator< - BoxFuture>, - >; + as BoxIterator>>; /* Also listen on some port just in case. */ if let Some(socket2) = socket2 { @@ -1306,20 +1351,21 @@ impl TransitConnector { let tside = tside.clone(); let cryptor = cryptor.clone(); let connect = || async { - let (stream, peer) = socket2.accept().await?; + let (socket, peer) = socket2.accept().await?; + let (socket, info) = + wrap_tcp_connection(socket, ConnectionType::Direct)?; log::debug!("Got connection from {}!", peer); - let transit = handshake_exchange( + let (transit, finalizer) = handshake_exchange( is_leader, tside.clone(), - stream, - &TransitInfo::Direct, + socket, + &ConnectionType::Direct, &*cryptor, transit_key.clone(), ) .await?; Result::<_, crypto::TransitHandshakeError>::Ok(( - transit, - TransitInfo::Direct, + transit, finalizer, info, )) }; loop { @@ -1337,25 +1383,27 @@ impl TransitConnector { }) .map(|fut| { Box::pin(fut) - as BoxFuture< - Result< - (HandshakeResult, TransitInfo), - crypto::TransitHandshakeError, - >, - > + as BoxFuture> }), ), ) - as BoxIterator< - BoxFuture< - Result<(HandshakeResult, TransitInfo), crypto::TransitHandshakeError>, - >, - >; + as BoxIterator>>; } connectors.collect::>() } } +/// Trait abstracting our socket used for communicating over the wire. +/// +/// Will be primarily instantiated by either a TCP or web socket. Custom methods +/// will be added in the future. +pub(self) trait TransitTransport: + AsyncRead + AsyncWrite + std::any::Any + Unpin + Send +{ +} + +impl TransitTransport for T where T: AsyncRead + AsyncWrite + std::any::Any + Unpin + Send {} + /** * An established Transit connection. * @@ -1364,7 +1412,7 @@ impl TransitConnector { */ pub struct Transit { /** Raw transit connection */ - socket: TcpStream, + socket: Box, tx: Box, rx: Box, } @@ -1414,7 +1462,11 @@ impl Transit { } } -type HandshakeResult = (TcpStream, Box); +type HandshakeResult = ( + Box, + Box, + TransitInfo, +); /** * Do a transit handshake exchange, to establish a direct connection. @@ -1428,20 +1480,18 @@ type HandshakeResult = (TcpStream, Box); async fn handshake_exchange( is_leader: bool, tside: Arc, - socket: TcpStream, - host_type: &TransitInfo, + mut socket: Box, + host_type: &ConnectionType, cryptor: &dyn crypto::TransitCryptoInit, key: Arc>, -) -> Result { - /* Set proper read and write timeouts. This will temporarily set the socket into blocking mode :/ */ - // https://github.com/async-rs/async-std/issues/499 - let socket = std::net::TcpStream::try_from(socket) - .expect("Internal error: this should not fail because we never cloned the socket"); - socket.set_write_timeout(Some(std::time::Duration::from_secs(120)))?; - socket.set_read_timeout(Some(std::time::Duration::from_secs(120)))?; - let mut socket: TcpStream = socket.into(); - - if host_type != &TransitInfo::Direct { +) -> Result< + ( + Box, + Box, + ), + crypto::TransitHandshakeError, +> { + if host_type != &ConnectionType::Direct { log::trace!("initiating relay handshake"); let sub_key = key.derive_subkey_from_purpose::("transit_relay_token"); diff --git a/src/transit/crypto.rs b/src/transit/crypto.rs index 6cda14c8..6650a07b 100644 --- a/src/transit/crypto.rs +++ b/src/transit/crypto.rs @@ -95,7 +95,7 @@ async fn write_transit_message( pub(super) trait TransitCryptoInitFinalizer: Send { fn handshake_finalize( self: Box, - socket: &mut TcpStream, + socket: &mut dyn TransitTransport, ) -> BoxFuture>; } @@ -104,7 +104,7 @@ pub(super) trait TransitCryptoInitFinalizer: Send { impl TransitCryptoInitFinalizer for DynTransitCrypto { fn handshake_finalize( self: Box, - _socket: &mut TcpStream, + _socket: &mut dyn TransitTransport, ) -> BoxFuture> { Box::pin(futures::future::ready(Ok(*self))) } @@ -116,11 +116,11 @@ pub(super) trait TransitCryptoInit: Send + Sync { // Yes, this method returns a nested future. TODO explain async fn handshake_leader( &self, - socket: &mut TcpStream, + socket: &mut dyn TransitTransport, ) -> Result, TransitHandshakeError>; async fn handshake_follower( &self, - socket: &mut TcpStream, + socket: &mut dyn TransitTransport, ) -> Result, TransitHandshakeError>; } @@ -140,7 +140,7 @@ pub struct SecretboxInit { impl TransitCryptoInit for SecretboxInit { async fn handshake_leader( &self, - socket: &mut TcpStream, + mut socket: &mut dyn TransitTransport, ) -> Result, TransitHandshakeError> { // 9. create record keys let rkey = self @@ -171,7 +171,7 @@ impl TransitCryptoInit for SecretboxInit { .to_hex() ); assert_eq!(expected_rx_handshake.len(), 89); - read_expect(socket, expected_rx_handshake.as_bytes()).await?; + read_expect(&mut socket, expected_rx_handshake.as_bytes()).await?; struct Finalizer { skey: Key, @@ -181,7 +181,7 @@ impl TransitCryptoInit for SecretboxInit { impl TransitCryptoInitFinalizer for Finalizer { fn handshake_finalize( self: Box, - socket: &mut TcpStream, + socket: &mut dyn TransitTransport, ) -> BoxFuture> { Box::pin(async move { socket.write_all(b"go\n").await?; @@ -205,7 +205,7 @@ impl TransitCryptoInit for SecretboxInit { async fn handshake_follower( &self, - socket: &mut TcpStream, + mut socket: &mut dyn TransitTransport, ) -> Result, TransitHandshakeError> { // 9. create record keys /* The order here is correct. The "sender" and "receiver" side are a misnomer and should be called @@ -240,7 +240,7 @@ impl TransitCryptoInit for SecretboxInit { .to_hex(), ); assert_eq!(expected_tx_handshake.len(), 90); - read_expect(socket, expected_tx_handshake.as_bytes()).await?; + read_expect(&mut socket, expected_tx_handshake.as_bytes()).await?; Ok(Box::new(( Box::new(SecretboxCryptoEncrypt { @@ -279,12 +279,16 @@ pub struct NoiseInit { impl TransitCryptoInit for NoiseInit { async fn handshake_leader( &self, - socket: &mut TcpStream, + mut socket: &mut dyn TransitTransport, ) -> Result, TransitHandshakeError> { socket .write_all(b"Magic-Wormhole Dilation Handshake v1 Leader\n\n") .await?; - read_expect(socket, b"Magic-Wormhole Dilation Handshake v1 Follower\n\n").await?; + read_expect( + &mut socket, + b"Magic-Wormhole Dilation Handshake v1 Follower\n\n", + ) + .await?; let mut handshake: NoiseHandshakeState = { let mut builder = noise_protocol::HandshakeStateBuilder::new(); @@ -296,16 +300,17 @@ impl TransitCryptoInit for NoiseInit { handshake.push_psk(&*self.key); // → psk, e - write_transit_message(socket, &handshake.write_message_vec(&[])?).await?; + write_transit_message(&mut socket, &handshake.write_message_vec(&[])?).await?; // ← e, ee - handshake.read_message(&read_transit_message(socket).await?, &mut [])?; + handshake.read_message(&read_transit_message(&mut socket).await?, &mut [])?; assert!(handshake.completed()); let (tx, mut rx) = handshake.get_ciphers(); // ← "" - let peer_confirmation_message = rx.decrypt_vec(&read_transit_message(socket).await?)?; + let peer_confirmation_message = + rx.decrypt_vec(&read_transit_message(&mut socket).await?)?; ensure!( peer_confirmation_message.len() == 0, TransitHandshakeError::HandshakeFailed @@ -319,11 +324,11 @@ impl TransitCryptoInit for NoiseInit { impl TransitCryptoInitFinalizer for Finalizer { fn handshake_finalize( mut self: Box, - socket: &mut TcpStream, + mut socket: &mut dyn TransitTransport, ) -> BoxFuture> { Box::pin(async move { // → "" - write_transit_message(socket, &self.tx.encrypt_vec(&[])).await?; + write_transit_message(&mut socket, &self.tx.encrypt_vec(&[])).await?; Ok::<_, TransitHandshakeError>(( Box::new(NoiseCryptoEncrypt { tx: self.tx }) @@ -340,12 +345,16 @@ impl TransitCryptoInit for NoiseInit { async fn handshake_follower( &self, - socket: &mut TcpStream, + mut socket: &mut dyn TransitTransport, ) -> Result, TransitHandshakeError> { socket .write_all(b"Magic-Wormhole Dilation Handshake v1 Follower\n\n") .await?; - read_expect(socket, b"Magic-Wormhole Dilation Handshake v1 Leader\n\n").await?; + read_expect( + &mut socket, + b"Magic-Wormhole Dilation Handshake v1 Leader\n\n", + ) + .await?; let mut handshake: NoiseHandshakeState = { let mut builder = noise_protocol::HandshakeStateBuilder::new(); @@ -357,20 +366,21 @@ impl TransitCryptoInit for NoiseInit { handshake.push_psk(&*self.key); // ← psk, e - handshake.read_message(&read_transit_message(socket).await?, &mut [])?; + handshake.read_message(&read_transit_message(&mut socket).await?, &mut [])?; // → e, ee - write_transit_message(socket, &handshake.write_message_vec(&[])?).await?; + write_transit_message(&mut socket, &handshake.write_message_vec(&[])?).await?; assert!(handshake.completed()); // Warning: rx and tx are swapped here (read the `get_ciphers` doc carefully) let (mut rx, mut tx) = handshake.get_ciphers(); // → "" - write_transit_message(socket, &tx.encrypt_vec(&[])).await?; + write_transit_message(&mut socket, &tx.encrypt_vec(&[])).await?; // ← "" - let peer_confirmation_message = rx.decrypt_vec(&read_transit_message(socket).await?)?; + let peer_confirmation_message = + rx.decrypt_vec(&read_transit_message(&mut socket).await?)?; ensure!( peer_confirmation_message.len() == 0, TransitHandshakeError::HandshakeFailed From 72fb575908ba83157d76dac3bddb6d00ade13fb2 Mon Sep 17 00:00:00 2001 From: piegames Date: Tue, 31 Jan 2023 22:02:40 +0100 Subject: [PATCH 2/8] Bump MSRV to 1.66 --- .github/workflows/push.yml | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 2d4a1b00..4e7eb283 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -56,7 +56,7 @@ jobs: - ubuntu-latest - windows-latest rust: - - 1.61.0 # MSRV (also change in Cargo.toml) + - 1.66.0 # MSRV (also change in Cargo.toml) - stable - nightly steps: diff --git a/Cargo.toml b/Cargo.toml index 43869809..6d2b3b90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ repository = "https://github.com/magic-wormhole/magic-wormhole.rs" documentation = "https://docs.rs/magic-wormhole/latest/" license = "EUPL-1.2" edition = "2021" -rust-version = "1.61" # MSRV (also change in CI) +rust-version = "1.66" # MSRV (also change in CI) [dependencies] serde = { version = "1.0.120", features = ["rc"] } From 82ad7b0de9205f05fb1338984fff6daa7ef11249 Mon Sep 17 00:00:00 2001 From: piegames Date: Tue, 31 Jan 2023 22:04:38 +0100 Subject: [PATCH 3/8] Dependencies for WASM support --- Cargo.lock | 353 ++++++++++++++++++++++++++++++++------------------- Cargo.toml | 28 +++- changelog.md | 1 + 3 files changed, 245 insertions(+), 137 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c05e8bb1..9f8687fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -229,9 +229,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.59" +version = "0.1.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e6e93155431f3931513b243d371981bb2770112b370c82745a1d19d2f99364" +checksum = "1cd7fce9ba8c3c042128ce72d8b2ddbf3a05747efb67ea0313c635e10bda47a2" dependencies = [ "proc-macro2", "quote", @@ -253,11 +253,22 @@ dependencies = [ "tungstenite", ] +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version", +] + [[package]] name = "atomic-waker" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" +checksum = "debc29dde2e69f9e47506b525f639ed42300fc014a3e007832592448fa8e4599" [[package]] name = "atty" @@ -311,9 +322,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "blake2" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b12e5fd123190ce1c2e559308a94c9bacad77907d4c6005d9e58fe1a0689e55e" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" dependencies = [ "digest 0.10.6", ] @@ -349,9 +360,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.11.1" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" +checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" [[package]] name = "bytecodec" @@ -377,9 +388,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] name = "cache-padded" @@ -389,9 +400,9 @@ checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" [[package]] name = "cc" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a20104e2335ce8a659d6dd92a51a767a0c062599c73b343fd152cb401e828c3d" +checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" [[package]] name = "cfg-if" @@ -453,7 +464,7 @@ dependencies = [ "once_cell", "strsim 0.10.0", "termcolor", - "terminal_size 0.2.3", + "terminal_size", "textwrap", ] @@ -504,9 +515,9 @@ dependencies = [ [[package]] name = "clipboard-win" -version = "4.4.2" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4ab1b92798304eedc095b53942963240037c0516452cb11aeba709d420b2219" +checksum = "7191c27c2357d9b7ef96baac1773290d4ca63b24205b82a3fd8a0637afcf0362" dependencies = [ "error-code", "str-buf", @@ -542,25 +553,24 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b" +checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e" dependencies = [ "crossbeam-utils", ] [[package]] name = "console" -version = "0.15.2" +version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c050367d967ced717c04b65d8c619d863ef9292ce0c5760028655a2fb298718c" +checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60" dependencies = [ "encode_unicode", "lazy_static", "libc", - "terminal_size 0.1.17", "unicode-width", - "winapi", + "windows-sys", ] [[package]] @@ -606,7 +616,7 @@ dependencies = [ "crossterm_winapi", "libc", "mio", - "parking_lot", + "parking_lot 0.12.1", "signal-hook", "signal-hook-mio", "winapi", @@ -656,7 +666,7 @@ version = "3.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1631ca6e3c59112501a9d87fd86f21591ff77acd31331e8a73f8d80a65bbdd71" dependencies = [ - "nix 0.26.1", + "nix 0.26.2", "windows-sys", ] @@ -732,11 +742,12 @@ dependencies = [ [[package]] name = "dialoguer" -version = "0.10.2" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a92e7e37ecef6857fdc0c0c5d42fd5b0938e46590c2183cc92dd310a6d078eb1" +checksum = "af3c796f3b0b408d9fd581611b47fa850821fcb84aa640b83a3c1a5be2d691f2" dependencies = [ "console", + "shell-words", "tempfile", "zeroize", ] @@ -877,9 +888,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" +checksum = "13e2792b0ff0340399d58445b88fd9770e3489eff258a4cbc1523418f12abf84" dependencies = [ "futures-channel", "futures-core", @@ -892,9 +903,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" +checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5" dependencies = [ "futures-core", "futures-sink", @@ -902,15 +913,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" [[package]] name = "futures-executor" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" +checksum = "e8de0a35a6ab97ec8869e32a2473f4b1324459e14c29275d14b10cb1fd19b50e" dependencies = [ "futures-core", "futures-task", @@ -919,9 +930,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531" [[package]] name = "futures-lite" @@ -940,9 +951,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" dependencies = [ "proc-macro2", "quote", @@ -951,21 +962,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" +checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364" [[package]] name = "futures-task" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" +checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366" [[package]] name = "futures-util" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" +checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" dependencies = [ "futures-channel", "futures-core", @@ -1019,6 +1030,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.9.0+wasi-snapshot-preview1", ] @@ -1030,8 +1042,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1046,15 +1060,15 @@ dependencies = [ [[package]] name = "gimli" -version = "0.27.0" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec7af912d60cdbd3677c1af9352ebae6fb8394d165568a2234df0fa00f87793" +checksum = "221996f774192f0f718773def8201c4ae31f02616a54ccfc2d358bb0e5cefdec" [[package]] name = "gloo-timers" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98c4a8d6391675c6b2ee1a6c8d06e8e2d03605c44cec1270675985a4c2a5500b" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" dependencies = [ "futures-channel", "futures-core", @@ -1169,12 +1183,12 @@ dependencies = [ [[package]] name = "if-addrs" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbc0fa01ffc752e9dbc72818cdb072cd028b86be5e09dd04c5a643704fe101a9" +checksum = "26b24dd0826eee92c56edcda7ff190f2cf52115c49eadb2c2da8063e2673a8c2" dependencies = [ "libc", - "winapi", + "windows-sys", ] [[package]] @@ -1195,9 +1209,9 @@ dependencies = [ [[package]] name = "indicatif" -version = "0.17.2" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4295cbb7573c16d310e99e713cf9e75101eb190ab31fccd35f2d2691b4352b19" +checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729" dependencies = [ "console", "number_prefix", @@ -1212,13 +1226,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", ] [[package]] name = "io-lifetimes" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c" +checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e" dependencies = [ "libc", "windows-sys", @@ -1226,9 +1243,9 @@ dependencies = [ [[package]] name = "is-terminal" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927609f78c2913a6f6ac3c27a4fe87f43e2a35367c0c4b0f8265e8f49a104330" +checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189" dependencies = [ "hermit-abi 0.2.6", "io-lifetimes", @@ -1238,9 +1255,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" +checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440" [[package]] name = "js-sys" @@ -1268,9 +1285,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.138" +version = "0.2.139" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6d7e329c562c5dfab7a46a2afabc8b987ab9a4834c9d1ca04dc54c1546cef8" +checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" [[package]] name = "linux-raw-sys" @@ -1326,9 +1343,12 @@ dependencies = [ "eyre", "futures", "futures_ringbuf", + "getrandom 0.1.16", + "getrandom 0.2.8", "hex", "hkdf", "if-addrs", + "instant", "libc", "log", "noise-protocol", @@ -1347,6 +1367,8 @@ dependencies = [ "thiserror", "time", "url", + "wasm-timer", + "ws_stream_wasm", "xsalsa20poly1305", ] @@ -1421,9 +1443,9 @@ dependencies = [ [[package]] name = "nix" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46a58d1d356c6597d08cde02c2f09d785b09e28711837b1ed667dc652c08a694" +checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" dependencies = [ "bitflags", "cfg-if", @@ -1458,9 +1480,9 @@ dependencies = [ [[package]] name = "nom" -version = "7.1.1" +version = "7.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" dependencies = [ "memchr", "minimal-lexical", @@ -1512,18 +1534,18 @@ dependencies = [ [[package]] name = "object" -version = "0.30.0" +version = "0.30.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "239da7f290cfa979f43f85a8efeee9a8a76d0827c356d37f9d3d7254d6b537fb" +checksum = "ea86265d3d3dcb6a27fc51bd29a4bf387fae9d2986b823079d4986af253eb439" dependencies = [ "memchr", ] [[package]] name = "once_cell" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860" +checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66" [[package]] name = "opaque-debug" @@ -1559,6 +1581,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1566,14 +1599,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.6", ] [[package]] name = "parking_lot_core" -version = "0.9.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf" dependencies = [ "cfg-if", "libc", @@ -1584,9 +1631,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1c2c742266c2f1041c914ba65355a83ae8747b05f208319784083583494b4b" +checksum = "d01a5bd0424d00070b0098dd17ebca6f961a959dead1dbcbbbc1d1cd8d3deeba" [[package]] name = "percent-encoding" @@ -1604,6 +1651,16 @@ dependencies = [ "indexmap", ] +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version", +] + [[package]] name = "pin-project" version = "1.0.12" @@ -1681,9 +1738,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "0.3.18" +version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bdd679d533107e090c2704a35982fc06302e30898e63ffa26a81155c012e92" +checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b" [[package]] name = "ppv-lite86" @@ -1717,9 +1774,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.47" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" +checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2" dependencies = [ "unicode-ident", ] @@ -1745,9 +1802,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.21" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" +checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" dependencies = [ "proc-macro2", ] @@ -1802,9 +1859,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.7.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a" +checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" dependencies = [ "aho-corasick", "memchr", @@ -1889,9 +1946,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.36.5" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3807b5d10909833d3e9acd1eb5fb988f79376ff10fce42937de71a449c4c588" +checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03" dependencies = [ "bitflags", "errno", @@ -1916,9 +1973,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.11" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" +checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" [[package]] name = "salsa20" @@ -1948,24 +2005,30 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.14" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" +checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a" + +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" [[package]] name = "serde" -version = "1.0.150" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e326c9ec8042f1b5da33252c8a37e9ffbd2c9bef0155215b6e6c80c790e05f91" +checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.150" +version = "1.0.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a3df25b0713732468deadad63ab9da1f1fd75a48a15024b50363f128db627e" +checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", @@ -1974,9 +2037,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.89" +version = "1.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db" +checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883" dependencies = [ "itoa", "ryu", @@ -2020,6 +2083,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shell-words" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" + [[package]] name = "signal-hook" version = "0.3.14" @@ -2139,9 +2208,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.105" +version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60b9b43d45702de4c839cb9b51d9f529c5dd26a4aff255b42b1ebc03e88ee908" +checksum = "1f4064b5b16e03ae50984a5a8ed5d4f8803e6bc1fd170a3cda91a1be4b18e3f5" dependencies = [ "proc-macro2", "quote", @@ -2176,23 +2245,13 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.1.3" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" dependencies = [ "winapi-util", ] -[[package]] -name = "terminal_size" -version = "0.1.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "terminal_size" version = "0.2.3" @@ -2209,23 +2268,23 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" dependencies = [ - "terminal_size 0.2.3", + "terminal_size", ] [[package]] name = "thiserror" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" +checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" +checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" dependencies = [ "proc-macro2", "quote", @@ -2395,15 +2454,15 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "unicode-bidi" -version = "0.3.8" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" +checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58" [[package]] name = "unicode-ident" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" +checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc" [[package]] name = "unicode-normalization" @@ -2566,6 +2625,21 @@ version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" +[[package]] +name = "wasm-timer" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be0ecb0db480561e9a7642b5d3e4187c128914e58aa84330b9493e3eb68c5e7f" +dependencies = [ + "futures", + "js-sys", + "parking_lot 0.11.2", + "pin-utils", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wayland-client" version = "0.29.5" @@ -2720,45 +2794,45 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e" +checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" [[package]] name = "windows_aarch64_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4" +checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" [[package]] name = "windows_i686_gnu" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7" +checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" [[package]] name = "windows_i686_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246" +checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" [[package]] name = "windows_x86_64_gnu" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed" +checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" [[package]] name = "windows_x86_64_gnullvm" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028" +checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" [[package]] name = "windows_x86_64_msvc" -version = "0.42.0" +version = "0.42.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5" +checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" [[package]] name = "wl-clipboard-rs" @@ -2803,6 +2877,25 @@ dependencies = [ "url", ] +[[package]] +name = "ws_stream_wasm" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7999f5f4217fe3818726b66257a4475f71e74ffd190776ad053fa159e50737f5" +dependencies = [ + "async_io_stream", + "futures", + "js-sys", + "log", + "pharos", + "rustc_version", + "send_wrapper", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "x11-clipboard" version = "0.7.0" diff --git a/Cargo.toml b/Cargo.toml index 6d2b3b90..0e987745 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,23 +32,18 @@ log = "0.4.13" base64 = "0.20.0" futures_ringbuf = "0.3.1" time = { version = "0.3.7", features = ["formatting"] } +instant = { version = "0.1.12", features = ["wasm-bindgen"] } derive_more = { version = "0.99.0", default-features = false, features = ["display", "deref", "from"] } thiserror = "1.0.24" futures = "0.3.12" -async-std = { version = "1.12.0", features = ["attributes", "unstable"] } -async-tungstenite = { version = "0.17.1", features = ["async-std-runtime", "async-tls"] } -async-io = "1.6.0" -libc = "0.2.101" url = { version = "2.2.2", features = ["serde"] } percent-encoding = { version = "2.1.0" } # Transit dependencies -socket2 = { version = "0.4.1", optional = true } stun_codec = { version = "0.2.0", optional = true } -if-addrs = { version = "0.7.0", optional = true } bytecodec = { version = "0.4.15", optional = true } async-trait = { version = "0.1.57", optional = true } noise-protocol = { version = "0.1.4", optional = true } @@ -56,13 +51,32 @@ noise-rust-crypto = { version = "0.5.0", optional = true } # Transfer dependencies -async-tar = { version = "0.4.2", optional = true } rmp-serde = { version = "1.0.0", optional = true } # Forwarding dependencies # rmp-serde = … # defined above +[target.'cfg(not(target_family = "wasm"))'.dependencies] +libc = "0.2.101" +async-std = { version = "1.12.0", features = ["attributes", "unstable"] } +async-tungstenite = { version = "0.17.1", features = ["async-std-runtime", "async-tls"] } +async-io = "1.6.0" + +# Transit +socket2 = { version = "0.4.1", optional = true } +if-addrs = { version = "0.8.0", optional = true } + +# Transfer + +async-tar = { version = "0.4.2", optional = true } + +[target.'cfg(target_family = "wasm")'.dependencies] +wasm-timer = "0.2.5" +ws_stream_wasm = "0.7.3" +getrandom = { version = "0.2.5", features = ["js"] } +getrandom_2 = { package = "getrandom", version = "0.1.16", features = ["js-sys"] } + # for some tests [dev-dependencies] env_logger = "0.10.0" diff --git a/changelog.md b/changelog.md index 45053253..c5978341 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,7 @@ ## Unreleased +- Added compilation support for WASM targets. - \[lib\]\[breaking\] replaced `transit::TransitInfo` with a struct containing the address, the old enum has been renamed to `transit::ConnectionType`. ## Version 0.6.0 From d6a75a74cf1d66286ea9e362aeffc0a29e56e7b0 Mon Sep 17 00:00:00 2001 From: piegames Date: Tue, 31 Jan 2023 22:04:46 +0100 Subject: [PATCH 4/8] Core: add WASM support --- src/core/rendezvous.rs | 92 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 4 deletions(-) diff --git a/src/core/rendezvous.rs b/src/core/rendezvous.rs index ec999809..b013877b 100644 --- a/src/core/rendezvous.rs +++ b/src/core/rendezvous.rs @@ -2,6 +2,7 @@ //! //! Wormhole builds upon this, so you usually don't need to bother. +#[cfg(not(target_family = "wasm"))] use async_tungstenite::tungstenite as ws2; use futures::prelude::*; use std::collections::VecDeque; @@ -38,11 +39,19 @@ pub enum RendezvousError { _0 )] Login(Vec), + #[cfg(not(target_family = "wasm"))] #[error("Websocket IO error")] IO( #[from] #[source] - async_tungstenite::tungstenite::Error, + ws2::Error, + ), + #[cfg(target_family = "wasm")] + #[error("Websocket IO error")] + IO( + #[from] + #[source] + ws_stream_wasm::WsErr, ), } @@ -65,11 +74,19 @@ impl RendezvousError { type MessageQueue = VecDeque; +#[cfg(not(target_family = "wasm"))] struct WsConnection { connection: async_tungstenite::WebSocketStream, } +#[cfg(target_family = "wasm")] +struct WsConnection { + connection: ws_stream_wasm::WsStream, + meta: ws_stream_wasm::WsMeta, +} + impl WsConnection { + #[cfg(not(target_family = "wasm"))] async fn send_message( &mut self, message: &OutboundMessage, @@ -83,6 +100,22 @@ impl WsConnection { Ok(()) } + #[cfg(target_family = "wasm")] + async fn send_message( + &mut self, + message: &OutboundMessage, + queue: Option<&mut MessageQueue>, + ) -> Result<(), RendezvousError> { + log::debug!("Sending {:?}", message); + self.connection + .send(ws_stream_wasm::WsMessage::Text( + serde_json::to_string(message).unwrap(), + )) + .await?; + self.receive_ack(queue).await?; + Ok(()) + } + async fn receive_ack( &mut self, mut queue: Option<&mut MessageQueue>, @@ -160,6 +193,7 @@ impl WsConnection { } } + #[cfg(not(target_family = "wasm"))] async fn receive_message(&mut self) -> Result, RendezvousError> { let message = self .connection @@ -195,6 +229,42 @@ impl WsConnection { }, } } + + #[cfg(target_family = "wasm")] + async fn receive_message(&mut self) -> Result, RendezvousError> { + let message = self + .connection + .next() + .await + .expect("TODO this should always be Some"); + match message { + ws_stream_wasm::WsMessage::Text(message_plain) => { + let message = serde_json::from_str(&message_plain)?; + log::debug!("Received {:?}", message); + match message { + InboundMessage::Unknown => { + log::warn!("Got unknown message, ignoring: '{}'", message_plain); + Ok(None) + }, + InboundMessage::Error { error, orig: _ } => Err(RendezvousError::server(error)), + message => Ok(Some(message)), + } + }, + ws_stream_wasm::WsMessage::Binary(_) => Err(RendezvousError::protocol( + "WebSocket messages must be UTF-8 encoded text", + )), + } + } + + #[cfg(not(target_family = "wasm"))] + async fn close(&mut self) -> Result<(), ws2::Error> { + self.connection.close(None).await + } + + #[cfg(target_family = "wasm")] + async fn close(&mut self) -> Result { + self.meta.close().await + } } #[derive(Clone, Debug, derive_more::Display)] @@ -262,8 +332,22 @@ impl RendezvousServer { relay_url: &str, ) -> Result<(Self, Option), RendezvousError> { let side = MySide::generate(); - let (connection, _) = async_tungstenite::async_std::connect_async(relay_url).await?; - let mut connection = WsConnection { connection }; + let mut connection; + + #[cfg(not(target_arch = "wasm32"))] + { + let (stream, _) = async_tungstenite::async_std::connect_async(relay_url).await?; + connection = WsConnection { connection: stream }; + } + + #[cfg(target_arch = "wasm32")] + { + let (meta, stream) = ws_stream_wasm::WsMeta::connect(relay_url, None).await?; + connection = WsConnection { + meta, + connection: stream, + }; + } let welcome = match connection.receive_message_some().await? { InboundMessage::Welcome { welcome } => welcome, @@ -510,7 +594,7 @@ impl RendezvousServer { }; } - self.connection.connection.close(None).await?; + self.connection.close().await?; Ok(()) } } From 0e7ed60e60c0f174131e68ce535561eb14af182d Mon Sep 17 00:00:00 2001 From: piegames Date: Tue, 31 Jan 2023 22:06:47 +0100 Subject: [PATCH 5/8] Transit: add WASM support --- src/transit.rs | 507 +++++++++++++++------------------------ src/transit/crypto.rs | 148 +++++------- src/transit/transport.rs | 304 +++++++++++++++++++++++ 3 files changed, 560 insertions(+), 399 deletions(-) create mode 100644 src/transit/transport.rs diff --git a/src/transit.rs b/src/transit.rs index 5d1838a8..e455dd96 100644 --- a/src/transit.rs +++ b/src/transit.rs @@ -16,29 +16,33 @@ use crate::{Key, KeyPurpose}; use serde_derive::{Deserialize, Serialize}; -use async_std::{ - io::{prelude::WriteExt, ReadExt}, - net::{TcpListener, TcpStream}, -}; -use futures::io::{AsyncRead, AsyncWrite}; +#[cfg(not(target_family = "wasm"))] +use async_std::net::{TcpListener, TcpStream}; #[allow(unused_imports)] /* We need them for the docs */ -use futures::{future::TryFutureExt, Sink, SinkExt, Stream, StreamExt, TryStreamExt}; +use futures::{ + future::FutureExt, + future::TryFutureExt, + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + Sink, SinkExt, Stream, StreamExt, TryStreamExt, +}; use log::*; use std::{ collections::HashSet, - net::{IpAddr, SocketAddr, ToSocketAddrs}, + net::{IpAddr, SocketAddr}, sync::Arc, }; -use xsalsa20poly1305 as secretbox; -use xsalsa20poly1305::aead::{Aead, NewAead}; mod crypto; +mod transport; +use crypto::TransitHandshakeError; +use transport::{TransitTransport, TransitTransportRx, TransitTransportTx}; /// ULR to a default hosted relay server. Please don't abuse or DOS. pub const DEFAULT_RELAY_SERVER: &str = "tcp://transit.magic-wormhole.io:4001"; // No need to make public, it's hard-coded anyways (: // Open an issue if you want an API for this // Use for non-production testing +#[cfg(not(target_family = "wasm"))] const PUBLIC_STUN_SERVER: &str = "stun.piegames.de:3478"; #[derive(Debug)] @@ -65,6 +69,13 @@ pub enum TransitConnectError { #[source] std::io::Error, ), + #[cfg(target_family = "wasm")] + #[error("WASM error")] + WASM( + #[from] + #[source] + ws_stream_wasm::WsErr, + ), } #[derive(Debug, thiserror::Error)] @@ -80,6 +91,13 @@ pub enum TransitError { #[source] std::io::Error, ), + #[cfg(target_family = "wasm")] + #[error("WASM error")] + WASM( + #[from] + #[source] + ws_stream_wasm::WsErr, + ), } impl From<()> for TransitError { @@ -573,6 +591,7 @@ pub struct TransitInfo { pub conn_type: ConnectionType, /// Target address of our connection. This may be our peer, or the relay server. /// This says nothing about the actual transport protocol used. + #[cfg(not(target_family = "wasm"))] pub peer_addr: SocketAddr, // Prevent exhaustive destructuring for future proofing _unused: (), @@ -580,71 +599,7 @@ pub struct TransitInfo { type TransitConnection = (Box, TransitInfo); -fn set_socket_opts(socket: &socket2::Socket) -> std::io::Result<()> { - socket.set_nonblocking(true)?; - - /* See https://stackoverflow.com/a/14388707/6094756. - * On most BSD and Linux systems, we need both REUSEADDR and REUSEPORT; - * and if they don't support the latter we won't compile. - * On Windows, there is only REUSEADDR but it does what we want. - */ - socket.set_reuse_address(true)?; - #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))] - { - socket.set_reuse_port(true)?; - } - #[cfg(not(any( - all(unix, not(any(target_os = "solaris", target_os = "illumos"))), - target_os = "windows" - )))] - { - compile_error!("Your system is not supported yet, please raise an error"); - } - - Ok(()) -} - -/** - * Bind to a port with SO_REUSEADDR, connect to the destination and then hide the blood behind a pretty [`async_std::net::TcpStream`] - * - * We want an `async_std::net::TcpStream`, but with SO_REUSEADDR set. - * The former is just a wrapper around `async_io::Async`, of which we - * copy the `connect` method to add a statement that will set the socket flag. - * See https://github.com/smol-rs/async-net/issues/20. - */ -async fn connect_custom( - local_addr: &socket2::SockAddr, - dest_addr: &socket2::SockAddr, -) -> std::io::Result { - log::debug!("Binding to {}", local_addr.as_socket().unwrap()); - let socket = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, None)?; - /* Set our custum options */ - set_socket_opts(&socket)?; - - socket.bind(local_addr)?; - - /* Initiate connect */ - match socket.connect(dest_addr) { - Ok(_) => {}, - #[cfg(unix)] - Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}, - Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}, - Err(err) => return Err(err), - } - - let stream = async_io::Async::new(std::net::TcpStream::from(socket))?; - /* The stream becomes writable when connected. */ - stream.writable().await?; - - /* Check if there was an error while connecting. */ - stream - .get_ref() - .take_error() - .and_then(|maybe_err| maybe_err.map_or(Ok(()), Result::Err))?; - /* Convert our mess to `async_std::net::TcpStream */ - Ok(stream.into_inner()?.into()) -} - +#[cfg(not(target_family = "wasm"))] #[derive(Debug, thiserror::Error)] enum StunError { #[error("No IPv4 addresses were found for the selected STUN server")] @@ -667,90 +622,6 @@ enum StunError { ), } -/** Perform a STUN query to get the external IP address */ -async fn get_external_ip() -> Result<(SocketAddr, TcpStream), StunError> { - let mut socket = connect_custom( - &"[::]:0".parse::().unwrap().into(), - &PUBLIC_STUN_SERVER - .to_socket_addrs()? - /* If you find yourself behind a NAT66, open an issue */ - .find(|x| x.is_ipv4()) - /* TODO add a helper method to stdlib for this */ - .map(|addr| match addr { - SocketAddr::V4(v4) => { - SocketAddr::new(IpAddr::V6(v4.ip().to_ipv6_mapped()), v4.port()) - }, - SocketAddr::V6(_) => unreachable!(), - }) - .ok_or(StunError::ServerIsV6Only)? - .into(), - ) - .await?; - - use bytecodec::{DecodeExt, EncodeExt}; - use stun_codec::{ - rfc5389::{ - self, - attributes::{MappedAddress, Software, XorMappedAddress}, - Attribute, - }, - Message, MessageClass, MessageDecoder, MessageEncoder, TransactionId, - }; - - fn get_binding_request() -> Result, bytecodec::Error> { - use rand::Rng; - let random_bytes = rand::thread_rng().gen::<[u8; 12]>(); - - let mut message = Message::new( - MessageClass::Request, - rfc5389::methods::BINDING, - TransactionId::new(random_bytes), - ); - - message.add_attribute(Attribute::Software(Software::new( - "magic-wormhole-rust".to_owned(), - )?)); - - // Encodes the message - let mut encoder = MessageEncoder::new(); - let bytes = encoder.encode_into_bytes(message.clone())?; - Ok(bytes) - } - - fn decode_address(buf: &[u8]) -> Result, bytecodec::Error> { - let mut decoder = MessageDecoder::::new(); - let decoded = decoder.decode_from_bytes(buf)??; - - let external_addr1 = decoded - .get_attribute::() - .map(|x| x.address()); - //let external_addr2 = decoded.get_attribute::().map(|x|x.address()); - let external_addr3 = decoded - .get_attribute::() - .map(|x| x.address()); - let external_addr = external_addr1 - // .or(external_addr2) - .or(external_addr3); - - Ok(external_addr) - } - - /* Connect the plugs */ - - socket.write_all(get_binding_request()?.as_ref()).await?; - - let mut buf = [0u8; 256]; - /* Read header first */ - socket.read_exact(&mut buf[..20]).await?; - let len: u16 = u16::from_be_bytes([buf[2], buf[3]]); - /* Read the rest of the message */ - socket.read_exact(&mut buf[20..][..len as usize]).await?; - let external_addr = - decode_address(&buf[..20 + len as usize])?.ok_or(StunError::ServerNoResponse)?; - - Ok((external_addr, socket)) -} - /// Utility method that logs information of the transit result /// /// Example usage: @@ -769,6 +640,7 @@ async fn get_external_ip() -> Result<(SocketAddr, TcpStream), StunError> { /// # Ok(()) /// # } /// ``` +#[cfg(not(target_family = "wasm"))] pub fn log_transit_connection(info: TransitInfo) { match info.conn_type { ConnectionType::Direct => { @@ -804,22 +676,24 @@ pub async fn init( relay_hints: Vec, ) -> Result { let mut our_hints = Hints::default(); - let mut listener = None; + #[cfg(not(target_family = "wasm"))] + let mut sockets = None; if let Some(peer_abilities) = peer_abilities { abilities = abilities.intersect(&peer_abilities); } /* Detect our IP addresses if the ability is enabled */ + #[cfg(not(target_family = "wasm"))] if abilities.can_direct() { let create_sockets = async { /* Do a STUN query to get our public IP. If it works, we must reuse the same socket (port) * so that we will be NATted to the same port again. If it doesn't, simply bind a new socket * and use that instead. */ - let socket: MaybeConnectedSocket = match async_std::future::timeout( + let socket: MaybeConnectedSocket = match timeout( std::time::Duration::from_secs(4), - get_external_ip(), + transport::tcp_get_external_ip(), ) .await .map_err(|_| StunError::Timeout) @@ -843,7 +717,7 @@ pub async fn init( log::warn!("Failed to get external address via STUN, {}", err); let socket = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, None)?; - set_socket_opts(&socket)?; + transport::set_socket_opts(&socket)?; socket.bind(&"[::]:0".parse::().unwrap().into())?; log::debug!( @@ -861,11 +735,11 @@ pub async fn init( * the port. In theory, we could, but it really confused the kernel to the point * of `accept` calls never returning again. */ - let socket2 = TcpListener::bind("[::]:0").await?; + let listener = TcpListener::bind("[::]:0").await?; /* Find our ports, iterate all our local addresses, combine them with the ports and that's our hints */ let port = socket.local_addr()?.as_socket().unwrap().port(); - let port2 = socket2.local_addr()?.port(); + let port2 = listener.local_addr()?.port(); our_hints.direct_tcp.extend( if_addrs::get_if_addrs()? .iter() @@ -884,12 +758,12 @@ pub async fn init( .into_iter() }), ); - log::debug!("Our socket for listening is {}", socket2.local_addr()?); + log::debug!("Our socket for listening is {}", listener.local_addr()?); - Ok::<_, std::io::Error>((socket, socket2)) + Ok::<_, std::io::Error>((socket, listener)) }; - listener = create_sockets + sockets = create_sockets .await // TODO replace with inspect_err once stable .map_err(|err| { @@ -904,12 +778,15 @@ pub async fn init( } Ok(TransitConnector { - sockets: listener, + #[cfg(not(target_family = "wasm"))] + sockets, our_abilities: abilities, our_hints: Arc::new(our_hints), }) } +/// Bound socket, maybe also connected. Guaranteed to have SO_REUSEADDR. +#[cfg(not(target_family = "wasm"))] #[derive(derive_more::From)] enum MaybeConnectedSocket { #[from] @@ -918,6 +795,7 @@ enum MaybeConnectedSocket { Stream(TcpStream), } +#[cfg(not(target_family = "wasm"))] impl MaybeConnectedSocket { fn local_addr(&self) -> std::io::Result { match &self { @@ -939,6 +817,7 @@ pub struct TransitConnector { * The first socket is the port from which we will start connection attempts. * For in case the user is behind no firewalls, we must also listen to the second socket. */ + #[cfg(not(target_family = "wasm"))] sockets: Option<(MaybeConnectedSocket, TcpListener)>, our_abilities: Abilities, our_hints: Arc, @@ -964,13 +843,14 @@ impl TransitConnector { their_hints: Arc, ) -> Result<(Transit, TransitInfo), TransitConnectError> { let Self { + #[cfg(not(target_family = "wasm"))] sockets, our_abilities, our_hints, } = self; let transit_key = Arc::new(transit_key); - let start = std::time::Instant::now(); + let start = instant::Instant::now(); let mut connection_stream = Box::pin( Self::connect( true, @@ -979,6 +859,7 @@ impl TransitConnector { our_hints, their_abilities, their_hints, + #[cfg(not(target_family = "wasm"))] sockets, ) .filter_map(|result| async { @@ -992,16 +873,14 @@ impl TransitConnector { }), ); - let (mut transit, mut finalizer, mut conn_info) = async_std::future::timeout( - std::time::Duration::from_secs(60), - connection_stream.next(), - ) - .await - .map_err(|_| { - log::debug!("`leader_connect` timed out"); - TransitConnectError::Handshake - })? - .ok_or(TransitConnectError::Handshake)?; + let (mut transit, mut finalizer, mut conn_info) = + timeout(std::time::Duration::from_secs(60), connection_stream.next()) + .await + .map_err(|_| { + log::debug!("`leader_connect` timed out"); + TransitConnectError::Handshake + })? + .ok_or(TransitConnectError::Handshake)?; if conn_info.conn_type != ConnectionType::Direct && our_abilities.can_direct() { log::debug!( @@ -1017,7 +896,7 @@ impl TransitConnector { } else { elapsed.mul_f32(0.3) }; - let _ = async_std::future::timeout(to_wait, async { + let _ = timeout(to_wait, async { while let Some((new_transit, new_finalizer, new_conn_info)) = connection_stream.next().await { @@ -1050,7 +929,6 @@ impl TransitConnector { TransitConnectError::Handshake })?; - // let socket = Box::new(socket) as Box; Ok(( Transit { socket: transit, @@ -1071,6 +949,7 @@ impl TransitConnector { their_hints: Arc, ) -> Result<(Transit, TransitInfo), TransitConnectError> { let Self { + #[cfg(not(target_family = "wasm"))] sockets, our_abilities, our_hints, @@ -1085,6 +964,7 @@ impl TransitConnector { our_hints, their_abilities, their_hints, + #[cfg(not(target_family = "wasm"))] sockets, ) .filter_map(|result| async { @@ -1098,7 +978,7 @@ impl TransitConnector { }), ); - let transit = match async_std::future::timeout( + let transit = match timeout( std::time::Duration::from_secs(60), &mut connection_stream.next(), ) @@ -1112,7 +992,6 @@ impl TransitConnector { log::debug!("`handshake_finalize` failed: {e}"); TransitConnectError::Handshake })?; - // let socket = Box::new(socket) as Box; Ok((Transit { socket, tx, rx }, conn_info)) }, @@ -1146,34 +1025,11 @@ impl TransitConnector { our_hints: Arc, their_abilities: Abilities, their_hints: Arc, - socket: Option<(MaybeConnectedSocket, TcpListener)>, - ) -> impl Stream> + 'static { - /* Take a tcp connection and transform it into a `TransitConnection` (mainly set timeouts) */ - fn wrap_tcp_connection( - socket: TcpStream, - conn_type: ConnectionType, - ) -> Result { - /* Set proper read and write timeouts. This will temporarily set the socket into blocking mode :/ */ - // https://github.com/async-rs/async-std/issues/499 - let socket = std::net::TcpStream::try_from(socket) - .expect("Internal error: this should not fail because we never cloned the socket"); - socket.set_write_timeout(Some(std::time::Duration::from_secs(120)))?; - socket.set_read_timeout(Some(std::time::Duration::from_secs(120)))?; - let socket: TcpStream = socket.into(); - - let info = TransitInfo { - conn_type, - peer_addr: socket - .peer_addr() - .expect("Internal error: socket must be IP"), - _unused: (), - }; - - Ok((Box::new(socket), info)) - } - - /* Have socket => can direct */ - assert!(socket.is_none() || our_abilities.can_direct()); + #[cfg(not(target_family = "wasm"))] sockets: Option<(MaybeConnectedSocket, TcpListener)>, + ) -> impl Stream> + 'static { + /* Have Some(sockets) → Can direct */ + #[cfg(not(target_family = "wasm"))] + assert!(sockets.is_none() || our_abilities.can_direct()); let cryptor = if our_abilities.can_noise_crypto() && their_abilities.can_noise_crypto() { log::debug!("Using noise protocol for encryption"); @@ -1193,17 +1049,25 @@ impl TransitConnector { /* Iterator of futures yielding a connection. They'll be then mapped with the handshake, collected into * a Vec and polled concurrently. */ + #[cfg(not(target_family = "wasm"))] use futures::future::BoxFuture; + #[cfg(target_family = "wasm")] + use futures::future::LocalBoxFuture as BoxFuture; type BoxIterator = Box>; - type ConnectorFuture = - BoxFuture<'static, Result>; + type ConnectorFuture = BoxFuture<'static, Result>; let mut connectors: BoxIterator = Box::new(std::iter::empty()); - /* Create direct connection sockets, if we support it. If peer doesn't support it, their list of hints will - * be empty and no entries will be pushed. - */ - let socket2 = if let Some((socket, socket2)) = socket { - let local_addr = Arc::new(socket.local_addr().unwrap()); + #[cfg(not(target_family = "wasm"))] + let (socket, listener) = sockets.unzip(); + #[cfg(not(target_family = "wasm"))] + if our_abilities.can_direct() && their_abilities.can_direct() { + let local_addr = socket.map(|socket| { + Arc::new( + socket + .local_addr() + .expect("This is guaranteed to be an IP socket"), + ) + }); /* Connect to each hint of the peer */ connectors = Box::new( connectors.chain( @@ -1213,48 +1077,13 @@ impl TransitConnector { .into_iter() /* Nobody should have that many IP addresses, even with NATing */ .take(50) - .map(move |hint| { - let local_addr = local_addr.clone(); - async move { - let dest_addr = SocketAddr::try_from(&hint)?; - log::debug!("Connecting directly to {}", dest_addr); - let socket = connect_custom(&local_addr, &dest_addr.into()).await?; - log::debug!("Connected to {}!", dest_addr); - - wrap_tcp_connection(socket, ConnectionType::Direct) - } - }) + .map(move |hint| transport::connect_tcp_direct(local_addr.clone(), hint)) .map(|fut| Box::pin(fut) as ConnectorFuture), ), ) as BoxIterator; - Some(socket2) - } else if our_abilities.can_direct() { - /* Fallback: We did not manage to bind a listener but we can still connect to the peer's hints */ - connectors = Box::new( - connectors.chain( - their_hints - .direct_tcp - .clone() - .into_iter() - /* Nobody should have that many IP addresses, even with NATing */ - .take(50) - .map(move |hint| async move { - let dest_addr = SocketAddr::try_from(&hint)?; - log::debug!("Connecting directly to {}", dest_addr); - let socket = async_std::net::TcpStream::connect(&dest_addr).await?; - log::debug!("Connected to {}!", dest_addr); - - wrap_tcp_connection(socket, ConnectionType::Direct) - }) - .map(|fut| Box::pin(fut) as ConnectorFuture), - ), - ) as BoxIterator; - None - } else { - None - }; + } - /* Relay hints. Make sure that both sides advertize it, since it is fine to support it without providing own hints. */ + /* Relay hints. Make sure that both sides advertise it, since it is fine to support it without providing own hints. */ if our_abilities.can_relay() && their_abilities.can_relay() { /* Collect intermediate into HashSet for deduplication */ let mut relay_hints = Vec::::new(); @@ -1263,12 +1092,14 @@ impl TransitConnector { hint.merge_into(&mut relay_hints); } - connectors = Box::new( - connectors.chain( + #[cfg(not(target_family = "wasm"))] + { + connectors = Box::new( + connectors.chain( relay_hints .into_iter() /* A hint may have multiple addresses pointing towards the server. This may be multiple - * domain aliases or different ports or an IPv6 or IPv4 address. We only need + * domain aliases or different ports or an IPv6 or IPv4 address. We only need * to connect to one of them, since they are considered equivalent. However, we * also want to be prepared for the rare case of one failing, thus we try to reach * up to three different addresses. To not flood the system with requests, we @@ -1278,43 +1109,83 @@ impl TransitConnector { .flat_map(|hint| { /* If the hint has no name, take the first domain name as fallback */ let name = hint.name - .or_else(|| { - /* Try to parse as IP address. We are only interested in human readable names (the IP address will be printed anyways) */ - hint.tcp.iter() + .or_else(|| { + /* Try to parse as IP address. We are only interested in human readable names (the IP address will be printed anyways) */ + hint.tcp.iter() .filter_map(|hint| match url::Host::parse(&hint.hostname) { Ok(url::Host::Domain(_)) => Some(hint.hostname.clone()), _ => None, }) .next() - }); + }); hint.tcp .into_iter() .take(3) .enumerate() .map(move |(i, h)| (i, h, name.clone())) - }) - .map(|(index, host, name)| async move { - async_std::task::sleep(std::time::Duration::from_secs( - index as u64 * 5, - )) - .await; - log::debug!("Connecting to relay {}", host); - let socket = TcpStream::connect((host.hostname.as_str(), host.port)) - .err_into::() - .await?; - log::debug!("Connected to {}!", host); - - wrap_tcp_connection(socket, ConnectionType::Relay { name }) - }) - .map(|fut| Box::pin(fut) as ConnectorFuture), - ), - ) as BoxIterator; + }) + .map(|(index, host, name)| async move { + sleep(std::time::Duration::from_secs( + index as u64 * 5, + )) + .await; + transport::connect_tcp_relay(host, name).await + }) + .map(|fut| Box::pin(fut) as ConnectorFuture), + ), + ) as BoxIterator; + } + + #[cfg(target_family = "wasm")] + { + connectors = Box::new( + connectors.chain( + relay_hints + .into_iter() + /* A hint may have multiple addresses pointing towards the server. This may be multiple + * domain aliases or different ports or an IPv6 or IPv4 address. We only need + * to connect to one of them, since they are considered equivalent. However, we + * also want to be prepared for the rare case of one failing, thus we try to reach + * up to three different addresses. To not flood the system with requests, we + * start them in a 5 seconds interval spread. If one of them succeeds, the remaining ones + * will be cancelled anyways. Note that a hint might not necessarily be reachable via TCP. + */ + .flat_map(|hint| { + /* If the hint has no name, take the first domain name as fallback */ + let name = hint.name + .or_else(|| { + /* Try to parse as IP address. We are only interested in human readable names (the IP address will be printed anyways) */ + hint.tcp.iter() + .filter_map(|hint| match url::Host::parse(&hint.hostname) { + Ok(url::Host::Domain(_)) => Some(hint.hostname.clone()), + _ => None, + }) + .next() + }); + hint.ws + .into_iter() + .take(3) + .enumerate() + .map(move |(i, u)| (i, u, name.clone())) + }) + .map(|(index, url, name)| async move { + sleep(std::time::Duration::from_secs( + index as u64 * 5, + )) + .await; + transport::connect_ws_relay(url, name).await + }) + .map(|fut| Box::pin(fut) as ConnectorFuture), + ), + ) as BoxIterator; + } } /* Do a handshake on all our found connections */ let transit_key2 = transit_key.clone(); let tside2 = tside.clone(); let cryptor2 = cryptor.clone(); + #[allow(unused_mut)] // For WASM targets let mut connectors = Box::new( connectors .map(move |fut| { @@ -1336,14 +1207,14 @@ impl TransitConnector { } }) .map(|fut| { - Box::pin(fut) - as BoxFuture> + Box::pin(fut) as BoxFuture> }), ) - as BoxIterator>>; + as BoxIterator>>; /* Also listen on some port just in case. */ - if let Some(socket2) = socket2 { + #[cfg(not(target_family = "wasm"))] + if let Some(listener) = listener { connectors = Box::new( connectors.chain( std::iter::once(async move { @@ -1351,9 +1222,9 @@ impl TransitConnector { let tside = tside.clone(); let cryptor = cryptor.clone(); let connect = || async { - let (socket, peer) = socket2.accept().await?; + let (socket, peer) = listener.accept().await?; let (socket, info) = - wrap_tcp_connection(socket, ConnectionType::Direct)?; + transport::wrap_tcp_connection(socket, ConnectionType::Direct)?; log::debug!("Got connection from {}!", peer); let (transit, finalizer) = handshake_exchange( is_leader, @@ -1364,9 +1235,7 @@ impl TransitConnector { transit_key.clone(), ) .await?; - Result::<_, crypto::TransitHandshakeError>::Ok(( - transit, finalizer, info, - )) + Result::<_, TransitHandshakeError>::Ok((transit, finalizer, info)) }; loop { match connect().await { @@ -1382,28 +1251,16 @@ impl TransitConnector { } }) .map(|fut| { - Box::pin(fut) - as BoxFuture> + Box::pin(fut) as BoxFuture> }), ), ) - as BoxIterator>>; + as BoxIterator>>; } connectors.collect::>() } } -/// Trait abstracting our socket used for communicating over the wire. -/// -/// Will be primarily instantiated by either a TCP or web socket. Custom methods -/// will be added in the future. -pub(self) trait TransitTransport: - AsyncRead + AsyncWrite + std::any::Any + Unpin + Send -{ -} - -impl TransitTransport for T where T: AsyncRead + AsyncWrite + std::any::Any + Unpin + Send {} - /** * An established Transit connection. * @@ -1435,14 +1292,13 @@ impl Transit { } /** Convert the transit connection to a [`Stream`]/[`Sink`] pair */ + #[cfg(not(target_family = "wasm"))] pub fn split( self, ) -> ( impl futures::sink::Sink, Error = TransitError>, impl futures::stream::Stream, TransitError>>, ) { - use futures::io::AsyncReadExt; - let (reader, writer) = self.socket.split(); ( futures::sink::unfold( @@ -1489,7 +1345,7 @@ async fn handshake_exchange( Box, Box, ), - crypto::TransitHandshakeError, + TransitHandshakeError, > { if host_type != &ConnectionType::Direct { log::trace!("initiating relay handshake"); @@ -1501,10 +1357,7 @@ async fn handshake_exchange( let mut rx = [0u8; 3]; socket.read_exact(&mut rx).await?; let ok_msg: [u8; 3] = *b"ok\n"; - ensure!( - ok_msg == rx, - crypto::TransitHandshakeError::RelayHandshakeFailed - ); + ensure!(ok_msg == rx, TransitHandshakeError::RelayHandshakeFailed); } let finalizer = if is_leader { @@ -1516,6 +1369,40 @@ async fn handshake_exchange( Ok((socket, finalizer)) } +#[cfg(not(target_family = "wasm"))] +pub(super) async fn sleep(duration: std::time::Duration) { + async_std::task::sleep(duration).await +} + +#[cfg(target_family = "wasm")] +pub(super) async fn sleep(duration: std::time::Duration) { + /* Skip error handling. Waiting is best effort anyways */ + let _ = wasm_timer::Delay::new(duration).await; +} + +#[cfg(not(target_family = "wasm"))] +pub(super) async fn timeout( + duration: std::time::Duration, + future: F, +) -> Result +where + F: futures::Future, +{ + async_std::future::timeout(duration, future).await +} + +#[cfg(target_family = "wasm")] +pub(super) async fn timeout( + duration: std::time::Duration, + future: F, +) -> Result +where + F: futures::Future, +{ + use wasm_timer::TryFutureExt; + future.map(Result::Ok).timeout(duration).await +} + #[cfg(test)] mod test { use super::*; diff --git a/src/transit/crypto.rs b/src/transit/crypto.rs index 6650a07b..1b64c90c 100644 --- a/src/transit/crypto.rs +++ b/src/transit/crypto.rs @@ -1,12 +1,18 @@ -/// Cryptographic backbone of the Transit protocol -/// -/// This handles the encrypted handshakes during connection setup, then provides -/// a simple "encrypt/decrypt" abstraction that will be used for all messages. -use super::*; +//! Cryptographic backbone of the Transit protocol +//! +//! This handles the encrypted handshakes during connection setup, then provides +//! a simple "encrypt/decrypt" abstraction that will be used for all messages. + +use super::{ + TransitError, TransitKey, TransitRxKey, TransitTransport, TransitTransportRx, + TransitTransportTx, TransitTxKey, +}; use crate::Key; use async_trait::async_trait; -use futures::future::BoxFuture; +use futures::{future::BoxFuture, io::AsyncWriteExt}; use std::sync::Arc; +use xsalsa20poly1305 as secretbox; +use xsalsa20poly1305::aead::{Aead, NewAead}; /// Private, because we try multiple handshakes and only /// one needs to succeed @@ -37,6 +43,13 @@ pub(super) enum TransitHandshakeError { #[source] std::io::Error, ), + #[cfg(target_family = "wasm")] + #[error("WASM error")] + WASM( + #[from] + #[source] + ws_stream_wasm::WsErr, + ), } impl From<()> for TransitHandshakeError { @@ -45,51 +58,6 @@ impl From<()> for TransitHandshakeError { } } -/// Helper method for handshake: read a fixed number of bytes and make sure they are as expected -async fn read_expect( - socket: &mut (dyn futures::io::AsyncRead + Unpin + Send), - expected: &[u8], -) -> Result<(), TransitHandshakeError> { - let mut buffer = vec![0u8; expected.len()]; - socket.read_exact(&mut buffer).await?; - ensure!(buffer == expected, TransitHandshakeError::HandshakeFailed); - Ok(()) -} - -/// Helper method: read a four bytes length prefix then the appropriate number of bytes -async fn read_transit_message( - socket: &mut (dyn futures::io::AsyncRead + Unpin + Send), -) -> Result, std::io::Error> { - // 1. read 4 bytes from the stream. This represents the length of the encrypted packet. - let length = { - let mut length_arr: [u8; 4] = [0; 4]; - socket.read_exact(&mut length_arr[..]).await?; - u32::from_be_bytes(length_arr) as usize - }; - - // 2. read that many bytes into an array (or a vector?) - let mut buffer = Vec::with_capacity(length); - let len = socket.take(length as u64).read_to_end(&mut buffer).await?; - use std::io::{Error, ErrorKind}; - ensure!( - len == length, - Error::new(ErrorKind::UnexpectedEof, "failed to read whole message") - ); - Ok(buffer) -} - -/// Helper method: write the message length then the message -async fn write_transit_message( - socket: &mut (dyn futures::io::AsyncWrite + Unpin + Send), - message: &[u8], -) -> Result<(), std::io::Error> { - // send the encrypted record - socket - .write_all(&(message.len() as u32).to_be_bytes()) - .await?; - socket.write_all(message).await -} - /// The Transit protocol has the property that the last message of the handshake is from the leader /// and confirms the usage of that specific connection. This trait represents that specific type state. pub(super) trait TransitCryptoInitFinalizer: Send { @@ -140,7 +108,7 @@ pub struct SecretboxInit { impl TransitCryptoInit for SecretboxInit { async fn handshake_leader( &self, - mut socket: &mut dyn TransitTransport, + socket: &mut dyn TransitTransport, ) -> Result, TransitHandshakeError> { // 9. create record keys let rkey = self @@ -171,7 +139,7 @@ impl TransitCryptoInit for SecretboxInit { .to_hex() ); assert_eq!(expected_rx_handshake.len(), 89); - read_expect(&mut socket, expected_rx_handshake.as_bytes()).await?; + socket.read_expect(expected_rx_handshake.as_bytes()).await?; struct Finalizer { skey: Key, @@ -205,7 +173,7 @@ impl TransitCryptoInit for SecretboxInit { async fn handshake_follower( &self, - mut socket: &mut dyn TransitTransport, + socket: &mut dyn TransitTransport, ) -> Result, TransitHandshakeError> { // 9. create record keys /* The order here is correct. The "sender" and "receiver" side are a misnomer and should be called @@ -240,7 +208,7 @@ impl TransitCryptoInit for SecretboxInit { .to_hex(), ); assert_eq!(expected_tx_handshake.len(), 90); - read_expect(&mut socket, expected_tx_handshake.as_bytes()).await?; + socket.read_expect(expected_tx_handshake.as_bytes()).await?; Ok(Box::new(( Box::new(SecretboxCryptoEncrypt { @@ -279,16 +247,14 @@ pub struct NoiseInit { impl TransitCryptoInit for NoiseInit { async fn handshake_leader( &self, - mut socket: &mut dyn TransitTransport, + socket: &mut dyn TransitTransport, ) -> Result, TransitHandshakeError> { socket .write_all(b"Magic-Wormhole Dilation Handshake v1 Leader\n\n") .await?; - read_expect( - &mut socket, - b"Magic-Wormhole Dilation Handshake v1 Follower\n\n", - ) - .await?; + socket + .read_expect(b"Magic-Wormhole Dilation Handshake v1 Follower\n\n") + .await?; let mut handshake: NoiseHandshakeState = { let mut builder = noise_protocol::HandshakeStateBuilder::new(); @@ -300,17 +266,18 @@ impl TransitCryptoInit for NoiseInit { handshake.push_psk(&*self.key); // → psk, e - write_transit_message(&mut socket, &handshake.write_message_vec(&[])?).await?; + socket + .write_transit_message(&handshake.write_message_vec(&[])?) + .await?; // ← e, ee - handshake.read_message(&read_transit_message(&mut socket).await?, &mut [])?; + handshake.read_message(&socket.read_transit_message().await?, &mut [])?; assert!(handshake.completed()); let (tx, mut rx) = handshake.get_ciphers(); // ← "" - let peer_confirmation_message = - rx.decrypt_vec(&read_transit_message(&mut socket).await?)?; + let peer_confirmation_message = rx.decrypt_vec(&socket.read_transit_message().await?)?; ensure!( peer_confirmation_message.len() == 0, TransitHandshakeError::HandshakeFailed @@ -324,11 +291,13 @@ impl TransitCryptoInit for NoiseInit { impl TransitCryptoInitFinalizer for Finalizer { fn handshake_finalize( mut self: Box, - mut socket: &mut dyn TransitTransport, + socket: &mut dyn TransitTransport, ) -> BoxFuture> { Box::pin(async move { // → "" - write_transit_message(&mut socket, &self.tx.encrypt_vec(&[])).await?; + socket + .write_transit_message(&self.tx.encrypt_vec(&[])) + .await?; Ok::<_, TransitHandshakeError>(( Box::new(NoiseCryptoEncrypt { tx: self.tx }) @@ -345,16 +314,14 @@ impl TransitCryptoInit for NoiseInit { async fn handshake_follower( &self, - mut socket: &mut dyn TransitTransport, + socket: &mut dyn TransitTransport, ) -> Result, TransitHandshakeError> { socket .write_all(b"Magic-Wormhole Dilation Handshake v1 Follower\n\n") .await?; - read_expect( - &mut socket, - b"Magic-Wormhole Dilation Handshake v1 Leader\n\n", - ) - .await?; + socket + .read_expect(b"Magic-Wormhole Dilation Handshake v1 Leader\n\n") + .await?; let mut handshake: NoiseHandshakeState = { let mut builder = noise_protocol::HandshakeStateBuilder::new(); @@ -366,21 +333,22 @@ impl TransitCryptoInit for NoiseInit { handshake.push_psk(&*self.key); // ← psk, e - handshake.read_message(&read_transit_message(&mut socket).await?, &mut [])?; + handshake.read_message(&socket.read_transit_message().await?, &mut [])?; // → e, ee - write_transit_message(&mut socket, &handshake.write_message_vec(&[])?).await?; + socket + .write_transit_message(&handshake.write_message_vec(&[])?) + .await?; assert!(handshake.completed()); // Warning: rx and tx are swapped here (read the `get_ciphers` doc carefully) let (mut rx, mut tx) = handshake.get_ciphers(); // → "" - write_transit_message(&mut socket, &tx.encrypt_vec(&[])).await?; + socket.write_transit_message(&tx.encrypt_vec(&[])).await?; // ← "" - let peer_confirmation_message = - rx.decrypt_vec(&read_transit_message(&mut socket).await?)?; + let peer_confirmation_message = rx.decrypt_vec(&socket.read_transit_message().await?)?; ensure!( peer_confirmation_message.len() == 0, TransitHandshakeError::HandshakeFailed @@ -396,19 +364,19 @@ impl TransitCryptoInit for NoiseInit { type DynTransitCrypto = (Box, Box); #[async_trait] -pub trait TransitCryptoEncrypt: Send { +pub(super) trait TransitCryptoEncrypt: Send { async fn encrypt( &mut self, - socket: &mut (dyn futures::io::AsyncWrite + Unpin + Send), + socket: &mut dyn TransitTransportTx, plaintext: &[u8], ) -> Result<(), TransitError>; } #[async_trait] -pub trait TransitCryptoDecrypt: Send { +pub(super) trait TransitCryptoDecrypt: Send { async fn decrypt( &mut self, - socket: &mut (dyn futures::io::AsyncRead + Unpin + Send), + socket: &mut dyn TransitTransportRx, ) -> Result, TransitError>; } @@ -434,7 +402,7 @@ struct SecretboxCryptoDecrypt { impl TransitCryptoEncrypt for SecretboxCryptoEncrypt { async fn encrypt( &mut self, - socket: &mut (dyn futures::io::AsyncWrite + Unpin + Send), + socket: &mut dyn TransitTransportTx, plaintext: &[u8], ) -> Result<(), TransitError> { let nonce = &mut self.snonce; @@ -467,11 +435,11 @@ impl TransitCryptoEncrypt for SecretboxCryptoEncrypt { impl TransitCryptoDecrypt for SecretboxCryptoDecrypt { async fn decrypt( &mut self, - socket: &mut (dyn futures::io::AsyncRead + Unpin + Send), + socket: &mut dyn TransitTransportRx, ) -> Result, TransitError> { let nonce = &mut self.rnonce; - let enc_packet = read_transit_message(socket).await?; + let enc_packet = socket.read_transit_message().await?; use std::io::{Error, ErrorKind}; ensure!( @@ -518,10 +486,12 @@ struct NoiseCryptoDecrypt { impl TransitCryptoEncrypt for NoiseCryptoEncrypt { async fn encrypt( &mut self, - socket: &mut (dyn futures::io::AsyncWrite + Unpin + Send), + socket: &mut dyn TransitTransportTx, plaintext: &[u8], ) -> Result<(), TransitError> { - write_transit_message(socket, &self.tx.encrypt_vec(plaintext)).await?; + socket + .write_transit_message(&self.tx.encrypt_vec(plaintext)) + .await?; Ok(()) } } @@ -530,9 +500,9 @@ impl TransitCryptoEncrypt for NoiseCryptoEncrypt { impl TransitCryptoDecrypt for NoiseCryptoDecrypt { async fn decrypt( &mut self, - socket: &mut (dyn futures::io::AsyncRead + Unpin + Send), + socket: &mut dyn TransitTransportRx, ) -> Result, TransitError> { - let plaintext = self.rx.decrypt_vec(&read_transit_message(socket).await?)?; + let plaintext = self.rx.decrypt_vec(&socket.read_transit_message().await?)?; Ok(plaintext.into_boxed_slice()) } } diff --git a/src/transit/transport.rs b/src/transit/transport.rs new file mode 100644 index 00000000..1e3839ec --- /dev/null +++ b/src/transit/transport.rs @@ -0,0 +1,304 @@ +//! Helper functions abstracting away different transport protocols for Transit + +use super::{ConnectionType, TransitConnection, TransitHandshakeError, TransitInfo}; +#[cfg(not(target_family = "wasm"))] +use super::{DirectHint, StunError}; + +#[cfg(not(target_family = "wasm"))] +use async_std::net::TcpStream; +use async_trait::async_trait; +use futures::{ + future::TryFutureExt, + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, +}; +#[cfg(not(target_family = "wasm"))] +use std::{ + net::{IpAddr, SocketAddr, ToSocketAddrs}, + sync::Arc, +}; + +#[async_trait] +pub(super) trait TransitTransportRx: AsyncRead + std::any::Any + Unpin + Send { + /// Helper method for handshake: read a fixed number of bytes and make sure they are as expected + async fn read_expect(&mut self, expected: &[u8]) -> Result<(), TransitHandshakeError> { + let mut buffer = vec![0u8; expected.len()]; + self.read_exact(&mut buffer).await?; + ensure!(buffer == expected, TransitHandshakeError::HandshakeFailed); + Ok(()) + } + + /// Helper method: read a four bytes length prefix then the appropriate number of bytes + async fn read_transit_message(&mut self) -> Result, std::io::Error> { + // 1. read 4 bytes from the stream. This represents the length of the encrypted packet. + let length = { + let mut length_arr: [u8; 4] = [0; 4]; + self.read_exact(&mut length_arr[..]).await?; + u32::from_be_bytes(length_arr) as usize + }; + + // 2. read that many bytes into an array (or a vector?) + let mut buffer = Vec::with_capacity(length); + let len = self.take(length as u64).read_to_end(&mut buffer).await?; + use std::io::{Error, ErrorKind}; + ensure!( + len == length, + Error::new(ErrorKind::UnexpectedEof, "failed to read whole message") + ); + Ok(buffer) + } +} + +#[async_trait] +pub(super) trait TransitTransportTx: AsyncWrite + std::any::Any + Unpin + Send { + /// Helper method: write the message length then the message + async fn write_transit_message(&mut self, message: &[u8]) -> Result<(), std::io::Error> { + // send the encrypted record + self.write_all(&(message.len() as u32).to_be_bytes()) + .await?; + self.write_all(message).await + } +} + +/// Trait abstracting our socket used for communicating over the wire. +/// +/// Will be primarily instantiated by either a TCP or web socket. Custom methods +/// will be added in the future. +pub(super) trait TransitTransport: TransitTransportRx + TransitTransportTx {} + +impl TransitTransportRx for T where T: AsyncRead + std::any::Any + Unpin + Send {} +impl TransitTransportTx for T where T: AsyncWrite + std::any::Any + Unpin + Send {} +impl TransitTransport for T where T: AsyncRead + AsyncWrite + std::any::Any + Unpin + Send {} + +#[cfg(not(target_family = "wasm"))] +pub(super) fn set_socket_opts(socket: &socket2::Socket) -> std::io::Result<()> { + socket.set_nonblocking(true)?; + + /* See https://stackoverflow.com/a/14388707/6094756. + * On most BSD and Linux systems, we need both REUSEADDR and REUSEPORT; + * and if they don't support the latter we won't compile. + * On Windows, there is only REUSEADDR but it does what we want. + */ + socket.set_reuse_address(true)?; + #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))] + { + socket.set_reuse_port(true)?; + } + #[cfg(not(any( + all(unix, not(any(target_os = "solaris", target_os = "illumos"))), + target_os = "windows" + )))] + { + compile_error!("Your system is not supported yet, please raise an error"); + } + + Ok(()) +} + +/** Perform a STUN query to get the external IP address */ +#[cfg(not(target_family = "wasm"))] +pub(super) async fn tcp_get_external_ip() -> Result<(SocketAddr, TcpStream), StunError> { + let mut socket = tcp_connect_custom( + &"[::]:0".parse::().unwrap().into(), + &super::PUBLIC_STUN_SERVER + .to_socket_addrs()? + /* If you find yourself behind a NAT66, open an issue */ + .find(|x| x.is_ipv4()) + /* TODO add a helper method to stdlib for this */ + .map(|addr| match addr { + SocketAddr::V4(v4) => { + SocketAddr::new(IpAddr::V6(v4.ip().to_ipv6_mapped()), v4.port()) + }, + SocketAddr::V6(_) => unreachable!(), + }) + .ok_or(StunError::ServerIsV6Only)? + .into(), + ) + .await?; + + use bytecodec::{DecodeExt, EncodeExt}; + use stun_codec::{ + rfc5389::{ + self, + attributes::{MappedAddress, Software, XorMappedAddress}, + Attribute, + }, + Message, MessageClass, MessageDecoder, MessageEncoder, TransactionId, + }; + + fn get_binding_request() -> Result, bytecodec::Error> { + use rand::Rng; + let random_bytes = rand::thread_rng().gen::<[u8; 12]>(); + + let mut message = Message::new( + MessageClass::Request, + rfc5389::methods::BINDING, + TransactionId::new(random_bytes), + ); + + message.add_attribute(Attribute::Software(Software::new( + "magic-wormhole-rust".to_owned(), + )?)); + + // Encodes the message + let mut encoder = MessageEncoder::new(); + let bytes = encoder.encode_into_bytes(message.clone())?; + Ok(bytes) + } + + fn decode_address(buf: &[u8]) -> Result, bytecodec::Error> { + let mut decoder = MessageDecoder::::new(); + let decoded = decoder.decode_from_bytes(buf)??; + + let external_addr1 = decoded + .get_attribute::() + .map(|x| x.address()); + //let external_addr2 = decoded.get_attribute::().map(|x|x.address()); + let external_addr3 = decoded + .get_attribute::() + .map(|x| x.address()); + let external_addr = external_addr1 + // .or(external_addr2) + .or(external_addr3); + + Ok(external_addr) + } + + /* Connect the plugs */ + + socket.write_all(get_binding_request()?.as_ref()).await?; + + let mut buf = [0u8; 256]; + /* Read header first */ + socket.read_exact(&mut buf[..20]).await?; + let len: u16 = u16::from_be_bytes([buf[2], buf[3]]); + /* Read the rest of the message */ + socket.read_exact(&mut buf[20..][..len as usize]).await?; + let external_addr = + decode_address(&buf[..20 + len as usize])?.ok_or(StunError::ServerNoResponse)?; + + Ok((external_addr, socket)) +} + +/** + * Bind to a port with SO_REUSEADDR, connect to the destination and then hide the blood behind a pretty [`async_std::net::TcpStream`] + * + * We want an `async_std::net::TcpStream`, but with SO_REUSEADDR set. + * The former is just a wrapper around `async_io::Async`, of which we + * copy the `connect` method to add a statement that will set the socket flag. + * See https://github.com/smol-rs/async-net/issues/20. + */ +#[cfg(not(target_family = "wasm"))] +async fn tcp_connect_custom( + local_addr: &socket2::SockAddr, + dest_addr: &socket2::SockAddr, +) -> std::io::Result { + log::debug!("Binding to {}", local_addr.as_socket().unwrap()); + let socket = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, None)?; + /* Set our custum options */ + set_socket_opts(&socket)?; + + socket.bind(local_addr)?; + + /* Initiate connect */ + match socket.connect(dest_addr) { + Ok(_) => {}, + #[cfg(unix)] + Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}, + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}, + Err(err) => return Err(err), + } + + let stream = async_io::Async::new(std::net::TcpStream::from(socket))?; + /* The stream becomes writable when connected. */ + stream.writable().await?; + + /* Check if there was an error while connecting. */ + stream + .get_ref() + .take_error() + .and_then(|maybe_err| maybe_err.map_or(Ok(()), Result::Err))?; + /* Convert our mess to `async_std::net::TcpStream */ + Ok(stream.into_inner()?.into()) +} + +#[cfg(not(target_family = "wasm"))] +pub(super) async fn connect_tcp_direct( + local_addr: Option>, + hint: DirectHint, +) -> Result { + let dest_addr = SocketAddr::try_from(&hint)?; + log::debug!("Connecting directly to {}", dest_addr); + let socket; + + if let Some(local_addr) = local_addr { + socket = tcp_connect_custom(&local_addr, &dest_addr.into()).await?; + log::debug!("Connected to {}!", dest_addr); + } else { + socket = async_std::net::TcpStream::connect(&dest_addr).await?; + log::debug!("Connected to {}!", dest_addr); + } + + wrap_tcp_connection(socket, ConnectionType::Direct) +} + +/* Take a relay hint and try to connect to it */ +#[cfg(not(target_family = "wasm"))] +pub(super) async fn connect_tcp_relay( + host: DirectHint, + name: Option, +) -> Result { + log::debug!("Connecting to relay {}", host); + let socket = TcpStream::connect((host.hostname.as_str(), host.port)) + .err_into::() + .await?; + log::debug!("Connected to {}!", host); + + wrap_tcp_connection(socket, ConnectionType::Relay { name }) +} + +#[cfg(target_family = "wasm")] +pub(super) async fn connect_ws_relay( + url: url::Url, + name: Option, +) -> Result { + log::debug!("Connecting to relay {}", url); + let (_meta, transit) = ws_stream_wasm::WsMeta::connect(&url, None) + .err_into::() + .await?; + log::debug!("Connected to {}!", url); + + let transit = Box::new(transit.into_io()) as Box; + + Ok(( + transit, + TransitInfo { + conn_type: ConnectionType::Relay { name }, + _unused: (), + }, + )) +} + +/* Take a tcp connection and transform it into a `TransitConnection` (mainly set timeouts) */ +#[cfg(not(target_family = "wasm"))] +pub(super) fn wrap_tcp_connection( + socket: TcpStream, + conn_type: ConnectionType, +) -> Result { + /* Set proper read and write timeouts. This will temporarily set the socket into blocking mode :/ */ + // https://github.com/async-rs/async-std/issues/499 + let socket = std::net::TcpStream::try_from(socket) + .expect("Internal error: this should not fail because we never cloned the socket"); + socket.set_write_timeout(Some(std::time::Duration::from_secs(120)))?; + socket.set_read_timeout(Some(std::time::Duration::from_secs(120)))?; + let socket: TcpStream = socket.into(); + + let info = TransitInfo { + conn_type, + peer_addr: socket + .peer_addr() + .expect("Internal error: socket must be IP"), + _unused: (), + }; + + Ok((Box::new(socket), info)) +} From e18e0301cafb13c73ccd8d8e180704adf9b871d8 Mon Sep 17 00:00:00 2001 From: piegames Date: Tue, 31 Jan 2023 22:07:01 +0100 Subject: [PATCH 6/8] Transfer: add WASM support --- src/transfer.rs | 6 ++++-- src/transfer/v1.rs | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/transfer.rs b/src/transfer.rs index 36f57437..a3fee776 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -201,6 +201,7 @@ impl TransitAck { } } +#[cfg(not(target_family = "wasm"))] pub async fn send_file_or_folder( wormhole: Wormhole, relay_hints: Vec, @@ -299,6 +300,7 @@ where /// This isn't a proper folder transfer as per the Wormhole protocol /// because it sends it in a way so that the receiver still has to manually /// unpack it. But it's better than nothing +#[cfg(not(target_family = "wasm"))] pub async fn send_folder( wormhole: Wormhole, relay_hints: Vec, @@ -515,7 +517,7 @@ async fn handle_run_result( result: Result<(Result<(), TransferError>, impl Future), crate::util::Cancelled>, ) -> Result<(), TransferError> { async fn wrap_timeout(run: impl Future, cancel: impl Future) { - let run = async_std::future::timeout(SHUTDOWN_TIME, run); + let run = transit::timeout(SHUTDOWN_TIME, run); futures::pin_mut!(run); match crate::util::cancellable(run, cancel).await { Ok(Ok(())) => {}, @@ -571,7 +573,7 @@ async fn handle_run_result( // and we should not only look for the next one but all have been received // and we should not interrupt a receive operation without making sure it leaves the connection // in a consistent state, otherwise the shutdown may cause protocol errors - if let Ok(Ok(Ok(PeerMessage::Error(e)))) = async_std::future::timeout(SHUTDOWN_TIME / 3, wormhole.receive_json()).await { + if let Ok(Ok(Ok(PeerMessage::Error(e)))) = transit::timeout(SHUTDOWN_TIME / 3, wormhole.receive_json()).await { error = TransferError::PeerError(e); } else { log::debug!("Failed to retrieve more specific error message from peer. Maybe it crashed?"); diff --git a/src/transfer/v1.rs b/src/transfer/v1.rs index be758490..15e31c17 100644 --- a/src/transfer/v1.rs +++ b/src/transfer/v1.rs @@ -108,6 +108,7 @@ where super::handle_run_result(wormhole, result).await } +#[cfg(not(target_family = "wasm"))] pub async fn send_folder( mut wormhole: Wormhole, relay_hints: Vec, From eab91a5d6d656d0613443a98e4bc7241d084ba05 Mon Sep 17 00:00:00 2001 From: piegames Date: Tue, 31 Jan 2023 22:07:25 +0100 Subject: [PATCH 7/8] CI: Test WASM target --- .github/workflows/push.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 4e7eb283..b233dc67 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -66,6 +66,7 @@ jobs: profile: minimal toolchain: ${{ matrix.rust }} override: true + target: wasm32-unknown-unknown - name: Cache ~/.cargo uses: actions/cache@v1 with: @@ -101,6 +102,11 @@ jobs: with: command: build args: --all-targets + - name: build WASM + uses: actions-rs/cargo@v1 + with: + command: build + args: --target wasm32-unknown-unknown --no-default-features --package magic-wormhole --features transit --features transfer - name: test uses: actions-rs/cargo@v1 with: From 485bf1962f200314dd53bb01b9bcc21c4c129262 Mon Sep 17 00:00:00 2001 From: piegames Date: Sun, 5 Feb 2023 21:23:16 +0100 Subject: [PATCH 8/8] Move some helper functions to `util` module --- src/transfer.rs | 6 +++--- src/transit.rs | 48 +++++++----------------------------------------- src/util.rs | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 43 insertions(+), 44 deletions(-) diff --git a/src/transfer.rs b/src/transfer.rs index a3fee776..8a0aeace 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -14,7 +14,7 @@ use serde_derive::{Deserialize, Serialize}; use serde_json::json; use std::sync::Arc; -use super::{core::WormholeError, transit, transit::Transit, AppID, Wormhole}; +use super::{core::WormholeError, transit, transit::Transit, util, AppID, Wormhole}; use futures::Future; use log::*; use std::{borrow::Cow, path::PathBuf}; @@ -517,7 +517,7 @@ async fn handle_run_result( result: Result<(Result<(), TransferError>, impl Future), crate::util::Cancelled>, ) -> Result<(), TransferError> { async fn wrap_timeout(run: impl Future, cancel: impl Future) { - let run = transit::timeout(SHUTDOWN_TIME, run); + let run = util::timeout(SHUTDOWN_TIME, run); futures::pin_mut!(run); match crate::util::cancellable(run, cancel).await { Ok(Ok(())) => {}, @@ -573,7 +573,7 @@ async fn handle_run_result( // and we should not only look for the next one but all have been received // and we should not interrupt a receive operation without making sure it leaves the connection // in a consistent state, otherwise the shutdown may cause protocol errors - if let Ok(Ok(Ok(PeerMessage::Error(e)))) = transit::timeout(SHUTDOWN_TIME / 3, wormhole.receive_json()).await { + if let Ok(Ok(Ok(PeerMessage::Error(e)))) = util::timeout(SHUTDOWN_TIME / 3, wormhole.receive_json()).await { error = TransferError::PeerError(e); } else { log::debug!("Failed to retrieve more specific error message from peer. Maybe it crashed?"); diff --git a/src/transit.rs b/src/transit.rs index e455dd96..a7678b02 100644 --- a/src/transit.rs +++ b/src/transit.rs @@ -13,7 +13,7 @@ //! **Notice:** while the resulting TCP connection is naturally bi-directional, the handshake is not symmetric. There *must* be one //! "leader" side and one "follower" side (formerly called "sender" and "receiver"). -use crate::{Key, KeyPurpose}; +use crate::{util, Key, KeyPurpose}; use serde_derive::{Deserialize, Serialize}; #[cfg(not(target_family = "wasm"))] @@ -691,7 +691,7 @@ pub async fn init( * so that we will be NATted to the same port again. If it doesn't, simply bind a new socket * and use that instead. */ - let socket: MaybeConnectedSocket = match timeout( + let socket: MaybeConnectedSocket = match util::timeout( std::time::Duration::from_secs(4), transport::tcp_get_external_ip(), ) @@ -874,7 +874,7 @@ impl TransitConnector { ); let (mut transit, mut finalizer, mut conn_info) = - timeout(std::time::Duration::from_secs(60), connection_stream.next()) + util::timeout(std::time::Duration::from_secs(60), connection_stream.next()) .await .map_err(|_| { log::debug!("`leader_connect` timed out"); @@ -896,7 +896,7 @@ impl TransitConnector { } else { elapsed.mul_f32(0.3) }; - let _ = timeout(to_wait, async { + let _ = util::timeout(to_wait, async { while let Some((new_transit, new_finalizer, new_conn_info)) = connection_stream.next().await { @@ -978,7 +978,7 @@ impl TransitConnector { }), ); - let transit = match timeout( + let transit = match util::timeout( std::time::Duration::from_secs(60), &mut connection_stream.next(), ) @@ -1125,7 +1125,7 @@ impl TransitConnector { .map(move |(i, h)| (i, h, name.clone())) }) .map(|(index, host, name)| async move { - sleep(std::time::Duration::from_secs( + util::sleep(std::time::Duration::from_secs( index as u64 * 5, )) .await; @@ -1169,7 +1169,7 @@ impl TransitConnector { .map(move |(i, u)| (i, u, name.clone())) }) .map(|(index, url, name)| async move { - sleep(std::time::Duration::from_secs( + util::sleep(std::time::Duration::from_secs( index as u64 * 5, )) .await; @@ -1369,40 +1369,6 @@ async fn handshake_exchange( Ok((socket, finalizer)) } -#[cfg(not(target_family = "wasm"))] -pub(super) async fn sleep(duration: std::time::Duration) { - async_std::task::sleep(duration).await -} - -#[cfg(target_family = "wasm")] -pub(super) async fn sleep(duration: std::time::Duration) { - /* Skip error handling. Waiting is best effort anyways */ - let _ = wasm_timer::Delay::new(duration).await; -} - -#[cfg(not(target_family = "wasm"))] -pub(super) async fn timeout( - duration: std::time::Duration, - future: F, -) -> Result -where - F: futures::Future, -{ - async_std::future::timeout(duration, future).await -} - -#[cfg(target_family = "wasm")] -pub(super) async fn timeout( - duration: std::time::Duration, - future: F, -) -> Result -where - F: futures::Future, -{ - use wasm_timer::TryFutureExt; - future.map(Result::Ok).timeout(duration).await -} - #[cfg(test)] mod test { use super::*; diff --git a/src/util.rs b/src/util.rs index dc7148b6..9ac5be1b 100644 --- a/src/util.rs +++ b/src/util.rs @@ -79,6 +79,7 @@ impl std::fmt::Display for DisplayBytes<'_> { * TODO remove after https://github.com/quininer/memsec/issues/11 is resolved. * Original implementation: https://github.com/jedisct1/libsodium/blob/6d566070b48efd2fa099bbe9822914455150aba9/src/libsodium/sodium/utils.c#L262-L307 */ +#[allow(unused)] pub fn sodium_increment_le(n: &mut [u8]) { let mut c = 1u16; for b in n { @@ -209,3 +210,35 @@ impl std::fmt::Display for Cancelled { write!(f, "Task has been cancelled") } } + +#[cfg(not(target_family = "wasm"))] +pub async fn sleep(duration: std::time::Duration) { + async_std::task::sleep(duration).await +} + +#[cfg(target_family = "wasm")] +pub async fn sleep(duration: std::time::Duration) { + /* Skip error handling. Waiting is best effort anyways */ + let _ = wasm_timer::Delay::new(duration).await; +} + +#[cfg(not(target_family = "wasm"))] +pub async fn timeout( + duration: std::time::Duration, + future: F, +) -> Result +where + F: futures::Future, +{ + async_std::future::timeout(duration, future).await +} + +#[cfg(target_family = "wasm")] +pub async fn timeout(duration: std::time::Duration, future: F) -> Result +where + F: futures::Future, +{ + use futures::FutureExt; + use wasm_timer::TryFutureExt; + future.map(Result::Ok).timeout(duration).await +}