diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index ef7759b2..c43cec3a 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -2,7 +2,7 @@ name: Rust on: push: - branches: [master, dev, dilation-1] + branches: [master, dev] pull_request: branches: [master, dev] @@ -97,11 +97,6 @@ jobs: with: command: build args: -p magic-wormhole --no-default-features --features=forwarding - - name: build library (features=dilation) - uses: actions-rs/cargo@v1 - with: - command: build - args: -p magic-wormhole --no-default-features --features=dilation - name: build CLI uses: actions-rs/cargo@v1 with: diff --git a/Cargo.lock b/Cargo.lock index 0bbb34b1..18b147a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -948,12 +948,6 @@ dependencies = [ "libloading", ] -[[package]] -name = "downcast" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" - [[package]] name = "downcast-rs" version = "1.2.0" @@ -1180,12 +1174,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fragile" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" - [[package]] name = "futures" version = "0.3.30" @@ -1672,8 +1660,6 @@ dependencies = [ "instant", "libc", "log", - "mockall", - "mockall_double", "noise-protocol", "noise-rust-crypto", "percent-encoding", @@ -1754,45 +1740,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "mockall" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48" -dependencies = [ - "cfg-if", - "downcast", - "fragile", - "lazy_static", - "mockall_derive", - "predicates", - "predicates-tree", -] - -[[package]] -name = "mockall_derive" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2" -dependencies = [ - "cfg-if", - "proc-macro2", - "quote", - "syn 2.0.50", -] - -[[package]] -name = "mockall_double" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1ca96e5ac35256ae3e13536edd39b172b88f41615e1d7b653c8ad24524113e8" -dependencies = [ - "cfg-if", - "proc-macro2", - "quote", - "syn 2.0.50", -] - [[package]] name = "nix" version = "0.26.4" @@ -2169,32 +2116,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" -[[package]] -name = "predicates" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8" -dependencies = [ - "anstyle", - "predicates-core", -] - -[[package]] -name = "predicates-core" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" - -[[package]] -name = "predicates-tree" -version = "1.0.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" -dependencies = [ - "predicates-core", - "termtree", -] - [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2769,12 +2690,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "termtree" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" - [[package]] name = "textwrap" version = "0.16.1" diff --git a/Cargo.toml b/Cargo.toml index 976fce63..727506e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,8 +29,6 @@ rand = "0.8.0" log = "0.4.13" base64 = "0.21.0" futures_ringbuf = "0.4.0" -async-trait = "0.1.57" -mockall_double = "0.3.0" time = { version = "0.3.7", features = ["formatting"] } instant = { version = "0.1.12", features = ["wasm-bindgen"] } @@ -47,11 +45,12 @@ percent-encoding = { version = "2.1.0" } # Transit dependencies + stun_codec = { version = "0.3.0", optional = true } bytecodec = { version = "0.4.15", optional = true } -noise-protocol = { version = "0.2.0-rc1", optional = true } noise-rust-crypto = { version = "0.6.0-rc.1", optional = true } - +async-trait = { version = "0.1.57", optional = true } +noise-protocol = { version = "0.2", optional = true } # Transfer dependencies rmp-serde = { version = "1.0.0", optional = true } @@ -88,7 +87,6 @@ getrandom = { version = "0.2.5", features = ["js"] } [dev-dependencies] env_logger = "0.11" eyre = "0.6.5" -mockall = "0.12" [features] transit = [ @@ -96,13 +94,13 @@ transit = [ "stun_codec", "if-addrs", "bytecodec", + "async-trait", "noise-protocol", "noise-rust-crypto", ] transfer = ["transit", "tar", "async-tar", "rmp-serde", "zstd"] -dilation = ["transit"] forwarding = ["transit", "rmp-serde"] -default = ["transfer", "dilation"] +default = ["transit", "transfer"] all = ["default", "forwarding"] [profile.release] diff --git a/cli/src/main.rs b/cli/src/main.rs index a04ff666..4902f0dd 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -12,9 +12,7 @@ use futures::{future::Either, Future, FutureExt}; use indicatif::{MultiProgress, ProgressBar}; use std::{io::Write, path::PathBuf}; -use magic_wormhole::{ - dilated_transfer, forwarding, transfer, transit, MailboxConnection, Wormhole, -}; +use magic_wormhole::{forwarding, transfer, transit, MailboxConnection, Wormhole}; fn install_ctrlc_handler( ) -> eyre::Result futures::future::BoxFuture<'static, ()> + Clone> { @@ -54,14 +52,6 @@ fn install_ctrlc_handler( }) } -// send, receive, -#[derive(Debug, Args)] -struct CommonTransferArgs { - /// Enable dilation - #[clap(long = "with-dilation", alias = "with-dilation")] - with_dilation: bool, -} - // send, send-many #[derive(Debug, Args)] struct CommonSenderArgs { @@ -187,8 +177,6 @@ enum WormholeCommand { common_leader: CommonLeaderArgs, #[clap(flatten)] common_send: CommonSenderArgs, - #[clap(flatten)] - common_transfer: CommonTransferArgs, }, /// Send a file to many recipients. READ HELP PAGE FIRST! #[clap( @@ -231,8 +219,6 @@ enum WormholeCommand { common_follower: CommonFollowerArgs, #[clap(flatten)] common_receiver: CommonReceiverArgs, - #[clap(flatten)] - common_transfer: CommonTransferArgs, }, /// Forward ports from one machine to another #[clap(subcommand)] @@ -305,7 +291,6 @@ async fn main() -> eyre::Result<()> { common, common_leader: CommonLeaderArgs { code, code_length }, common_send: CommonSenderArgs { file_name, files }, - common_transfer: CommonTransferArgs { with_dilation: _ }, .. } => { let offer = make_send_offer(files, file_name).await?; @@ -386,31 +371,17 @@ async fn main() -> eyre::Result<()> { common, common_follower: CommonFollowerArgs { code }, common_receiver: CommonReceiverArgs { file_path }, - common_transfer: CommonTransferArgs { with_dilation }, .. } => { - if with_dilation { - log::warn!("The dilation feature is still work in progress. Please remove the `--with-dilation` argument to avoid this."); - } - let transit_abilities = parse_transit_args(&common); let (wormhole, _code, relay_hints) = { - let app_config = dilated_transfer::APP_CONFIG.with_dilation(with_dilation); - let app_config = if with_dilation { - app_config.app_version(dilated_transfer::AppVersion::new(Some( - dilated_transfer::FileTransferV2Mode::Receive, - ))) - } else { - app_config - }; - let connect_fut = Box::pin(parse_and_connect( &mut term, common, code, None, false, - app_config, + transfer::APP_CONFIG, None, clipboard.as_mut(), )); @@ -420,21 +391,15 @@ async fn main() -> eyre::Result<()> { } }; - if with_dilation && peer_allows_dilation(wormhole.peer_version()) { - log::debug!("dilate wormhole"); - let mut dilated_wormhole = wormhole.dilate()?; // need to pass transit relay URL - dilated_wormhole.run().await; - } else { - Box::pin(receive( - wormhole, - relay_hints, - &file_path, - noconfirm, - transit_abilities, - ctrl_c, - )) - .await?; - } + Box::pin(receive( + wormhole, + relay_hints, + &file_path, + noconfirm, + transit_abilities, + ctrl_c, + )) + .await?; }, WormholeCommand::Forward(ForwardCommand::Serve { targets, @@ -587,11 +552,6 @@ async fn main() -> eyre::Result<()> { Ok(()) } -fn peer_allows_dilation(_version: &serde_json::Value) -> bool { - // TODO needs to be implemented - true -} - fn parse_transit_args(args: &CommonArgs) -> transit::Abilities { match (args.force_direct, args.force_relay) { (false, false) => transit::Abilities::ALL_ABILITIES, @@ -905,6 +865,7 @@ async fn send_many( MailboxConnection::connect(transfer::APP_CONFIG, code.clone(), false).await?, ) .await?; + send_in_background( relay_hints.clone(), make_send_offer(files.clone(), file_name.clone()).await?, diff --git a/src/core.rs b/src/core.rs index bceaf98e..0ae51520 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,23 +1,19 @@ -use std::{any::Any, borrow::Cow}; - -use crate::core::protocol::{WormholeProtocol, WormholeProtocolDefault}; -#[cfg(feature = "dilation")] -use crate::dilation::DilatedWormhole; -use crypto_secretbox as secretbox; -use log::*; -use serde_derive::{Deserialize, Serialize}; -use serde_json::Value; - -use self::{rendezvous::*, server_messages::EncryptedMessage}; - pub(super) mod key; -pub(crate) mod protocol; pub mod rendezvous; mod server_messages; #[cfg(test)] -pub(crate) mod test; +mod test; mod wordlist; +use serde_derive::{Deserialize, Serialize}; +use std::borrow::Cow; + +use self::rendezvous::*; +pub(self) use self::server_messages::EncryptedMessage; +use log::*; + +use crypto_secretbox as secretbox; + #[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum WormholeError { @@ -48,8 +44,6 @@ pub enum WormholeError { Crypto, #[error("Nameplate is unclaimed: {}", _0)] UnclaimedNameplate(Nameplate), - #[error("Dilation version mismatch")] - DilationVersion, } impl WormholeError { @@ -186,7 +180,7 @@ impl MailboxConnection { /// use magic_wormhole::{transfer::APP_CONFIG, Code, MailboxConnection, Nameplate}; /// let config = APP_CONFIG; /// let code = Code::new(&Nameplate::new("5"), "password"); - /// let mut mailbox_connection = MailboxConnection::connect(config, code, false).await?; + /// let mailbox_connection = MailboxConnection::connect(config, code, false).await?; /// # Ok(()) })} /// ``` pub async fn connect( @@ -228,12 +222,12 @@ impl MailboxConnection { /// async_std::task::block_on(async { /// use magic_wormhole::{transfer::APP_CONFIG, MailboxConnection, Mood}; /// let config = APP_CONFIG; - /// let mut mailbox_connection = MailboxConnection::create_with_password(config, "secret") + /// let mailbox_connection = MailboxConnection::create_with_password(config, "secret") /// .await?; /// mailbox_connection.shutdown(Mood::Happy).await?; /// # Ok(())})} /// ``` - pub async fn shutdown(&mut self, mood: Mood) -> Result<(), WormholeError> { + pub async fn shutdown(self, mood: Mood) -> Result<(), WormholeError> { self.server .shutdown(mood) .await @@ -243,15 +237,36 @@ impl MailboxConnection { #[derive(Debug)] pub struct Wormhole { - protocol: Box, + server: RendezvousServer, + phase: u64, + key: key::Key, + appid: AppID, + /** + * If you're paranoid, let both sides check that they calculated the same verifier. + * + * PAKE hardens a standard key exchange with a password ("password authenticated") in order + * to mitigate potential man in the middle attacks that would otherwise be possible. Since + * the passwords usually are not of hight entropy, there is a low-probability possible of + * an attacker guessing the password correctly, enabling them to MitM the connection. + * + * Not only is that probability low, but they also have only one try per connection and a failed + * attempts will be noticed by both sides. Nevertheless, comparing the verifier mitigates that + * attack vector. + */ + pub verifier: Box, + /** + * Our "app version" information that we sent. See the [`peer_version`] for more information. + */ + pub our_version: Box, + /** + * Protocol version information from the other side. + * This is bound by the [`AppID`]'s protocol and thus shall be handled on a higher level + * (e.g. by the file transfer API). + */ + pub peer_version: serde_json::Value, } impl Wormhole { - #[cfg(test)] - pub fn new(protocol: Box) -> Self { - Wormhole { protocol } - } - /** * Generate a code and connect to the rendezvous server. * @@ -338,10 +353,6 @@ impl Wormhole { /* Send versions message */ let mut versions = key::VersionsMessage::new(); versions.set_app_versions(serde_json::to_value(&config.app_version).unwrap()); - #[cfg(feature = "dilation")] - if config.with_dilation { - versions.enable_dilation(); - } let (version_phase, version_msg) = key::build_version_msg(server.side(), &key, &versions); server.send_peer_message(version_phase, version_msg).await?; let peer_version = server.next_peer_message_some().await?; @@ -364,12 +375,13 @@ impl Wormhole { /* We are now fully initialized! Up and running! :tada: */ Ok(Self { - protocol: Box::new(WormholeProtocolDefault::new( - server, - config, - key::Key::new(key.into()), - peer_version, - )), + server, + appid: config.id, + phase: 0, + key: key::Key::new(key.into()), + verifier: Box::new(key::derive_verifier(&key)), + our_version: Box::new(config.app_version), + peer_version, }) } @@ -378,19 +390,16 @@ impl Wormhole { todo!() } - /** - * create a dilated wormhole - */ - #[cfg(feature = "dilation")] - pub fn dilate(self) -> Result { - // XXX: create endpoints? - // get versions from the other side and check if they support dilation. - let can_they_dilate = &self.protocol.peer_version()["can-dilate"]; - if !can_they_dilate.is_null() && can_they_dilate[0] != "1" { - return Err(WormholeError::DilationVersion); - } - - Ok(DilatedWormhole::new(self, MySide::generate(8))) + /** Send an encrypted message to peer */ + pub async fn send(&mut self, plaintext: Vec) -> Result<(), WormholeError> { + let phase_string = Phase::numeric(self.phase); + self.phase += 1; + let data_key = key::derive_phase_key(self.server.side(), &self.key, &phase_string); + let (_nonce, encrypted) = key::encrypt_data(&data_key, &plaintext); + self.server + .send_peer_message(phase_string, encrypted) + .await?; + Ok(()) } /** @@ -403,21 +412,33 @@ impl Wormhole { * * If the serialization fails */ - pub async fn send_json( + pub async fn send_json( &mut self, message: &T, ) -> Result<(), WormholeError> { - self.send_json_with_phase(message, Phase::numeric).await - } + self.send(serde_json::to_vec(message).unwrap()).await + } + + /** Receive an encrypted message from peer */ + pub async fn receive(&mut self) -> Result, WormholeError> { + loop { + let peer_message = match self.server.next_peer_message().await? { + Some(peer_message) => peer_message, + None => continue, + }; + if peer_message.phase.to_num().is_none() { + // TODO: log and ignore, for future expansion + todo!("log and ignore, for future expansion"); + } - pub async fn send_json_with_phase( - &mut self, - message: &T, - phase_provider: PhaseProvider, - ) -> Result<(), WormholeError> { - self.protocol - .send_with_phase(serde_json::to_vec(message).unwrap(), phase_provider) - .await + // TODO maybe reorder incoming messages by phase numeral? + let decrypted_message = peer_message + .decrypt(&self.key) + .ok_or(WormholeError::Crypto)?; + + // Send to client + return Ok(decrypted_message); + } } /** @@ -431,7 +452,7 @@ impl Wormhole { where T: for<'a> serde::Deserialize<'a>, { - self.protocol.receive().await.map(|data: Vec| { + self.receive().await.map(|data: Vec| { serde_json::from_slice(&data).map_err(|e| { log::error!( "Received invalid data from peer: '{}'", @@ -442,8 +463,9 @@ impl Wormhole { }) } - pub async fn close(&mut self) -> Result<(), WormholeError> { - self.protocol.close().await + pub async fn close(self) -> Result<(), WormholeError> { + log::debug!("Closing Wormhole…"); + self.server.shutdown(Mood::Happy).await.map_err(Into::into) } /** @@ -451,7 +473,7 @@ impl Wormhole { * This determines the upper-layer protocol. Only wormholes with the same value can talk to each other. */ pub fn appid(&self) -> &AppID { - self.protocol.appid() + &self.appid } /** @@ -459,29 +481,13 @@ impl Wormhole { * Can be used to derive sub-keys for different purposes. */ pub fn key(&self) -> &key::Key { - self.protocol.key() - } - - pub fn peer_version(&self) -> &Value { - self.protocol.peer_version() - } - - pub fn our_version(&self) -> &Box { - self.protocol.our_version() + &self.key } } // the serialized forms of these variants are part of the wire protocol, so // they must be spelled exactly as shown -#[derive( - Debug, - PartialEq, - Copy, - Clone, - serde_derive::Deserialize, - serde_derive::Serialize, - derive_more::Display, -)] +#[derive(Debug, PartialEq, Copy, Clone, Deserialize, Serialize, derive_more::Display)] pub enum Mood { #[serde(rename = "happy")] Happy, @@ -495,11 +501,8 @@ pub enum Mood { Unwelcome, } -#[allow(dead_code)] -pub const APPID_RAW: &str = "lothar.com/wormhole/text-or-file-xfer"; - /** - * Wormhole configuration corresponding to an upper layer protocol + * Wormhole configuration corresponding to an uppler layer protocol * * There are multiple different protocols built on top of the core * Wormhole protocol. They are identified by a unique URI-like ID string @@ -510,14 +513,13 @@ pub const APPID_RAW: &str = "lothar.com/wormhole/text-or-file-xfer"; * See [`crate::transfer::APP_CONFIG`], which entails */ #[derive(PartialEq, Eq, Clone, Debug)] -pub struct AppConfig { +pub struct AppConfig { pub id: AppID, pub rendezvous_url: Cow<'static, str>, pub app_version: V, - pub with_dilation: bool, } -impl AppConfig { +impl AppConfig { pub fn id(mut self, id: AppID) -> Self { self.id = id; self @@ -527,12 +529,9 @@ impl AppConfig { self.rendezvous_url = rendezvous_url; self } +} - pub fn with_dilation(mut self, with_dilation: bool) -> Self { - self.with_dilation = with_dilation; - self - } - +impl AppConfig { pub fn app_version(mut self, app_version: V) -> Self { self.app_version = app_version; self @@ -564,25 +563,17 @@ impl From for AppID { // MySide is used for the String that we send in all our outbound messages #[derive( - PartialOrd, - PartialEq, - Eq, - Clone, - Debug, - Deserialize, - Serialize, - derive_more::Display, - derive_more::Deref, + PartialEq, Eq, Clone, Debug, Deserialize, Serialize, derive_more::Display, derive_more::Deref, )] #[serde(transparent)] #[display(fmt = "MySide({})", "&*_0")] pub struct MySide(EitherSide); impl MySide { - pub fn generate(length: usize) -> MySide { + pub fn generate() -> MySide { use rand::{rngs::OsRng, RngCore}; - let mut bytes = vec![0; length]; + let mut bytes: [u8; 5] = [0; 5]; OsRng.fill_bytes(&mut bytes); MySide(EitherSide(hex::encode(bytes))) @@ -598,15 +589,7 @@ impl MySide { // TheirSide is used for the string that arrives inside inbound messages #[derive( - PartialOrd, - PartialEq, - Eq, - Clone, - Debug, - Deserialize, - Serialize, - derive_more::Display, - derive_more::Deref, + PartialEq, Eq, Clone, Debug, Deserialize, Serialize, derive_more::Display, derive_more::Deref, )] #[serde(transparent)] #[display(fmt = "TheirSide({})", "&*_0")] @@ -619,15 +602,7 @@ impl> From for TheirSide { } #[derive( - PartialOrd, - PartialEq, - Eq, - Clone, - Debug, - Deserialize, - Serialize, - derive_more::Display, - derive_more::Deref, + PartialEq, Eq, Clone, Debug, Deserialize, Serialize, derive_more::Display, derive_more::Deref, )] #[serde(transparent)] #[deref(forward)] @@ -640,12 +615,6 @@ impl> From for EitherSide { } } -impl From for TheirSide { - fn from(side: MySide) -> TheirSide { - TheirSide(side.0) - } -} - #[derive(PartialEq, Eq, Clone, Debug, Hash, Deserialize, Serialize, derive_more::Display)] #[serde(transparent)] pub struct Phase(pub Cow<'static, str>); @@ -658,10 +627,6 @@ impl Phase { Phase(phase.to_string().into()) } - pub fn dilation(phase: u64) -> Self { - Phase(format!("dilate-{}", phase).into()) - } - pub fn is_version(&self) -> bool { self == &Self::VERSION } @@ -673,8 +638,6 @@ impl Phase { } } -type PhaseProvider = fn(u64) -> Phase; - #[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize, derive_more::Display)] #[serde(transparent)] pub struct Mailbox(pub String); @@ -725,15 +688,3 @@ impl Code { Nameplate::new(self.0.split('-').next().unwrap()) } } - -#[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(rename_all = "kebab-case", tag = "type")] -pub enum Ability { - DirectTcpV1, - RelayV1, - RelayV2, - #[cfg(any())] - NoiseCryptoV1, - #[serde(other)] - Other, -} diff --git a/src/core/key.rs b/src/core/key.rs index 36730872..90dd2470 100644 --- a/src/core/key.rs +++ b/src/core/key.rs @@ -102,44 +102,24 @@ pub fn make_pake(password: &str, appid: &AppID) -> (Spake2, Vec, - //#[serde(default)] - pub can_dilate: Option<[Cow<'static, str>; 1]>, - //#[serde(default)] - pub dilation_abilities: Option>, - //#[serde(default)] - #[serde(rename = "app_versions")] + #[serde(default)] pub app_versions: serde_json::Value, // resume: Option, } impl VersionsMessage { pub fn new() -> Self { - // Default::default() - Self { - abilities: vec![], - can_dilate: None, - dilation_abilities: Some(std::borrow::Cow::Borrowed(&[ - Ability::DirectTcpV1, - Ability::RelayV1, - ])), - app_versions: serde_json::Value::Null, - } + Default::default() } pub fn set_app_versions(&mut self, versions: serde_json::Value) { self.app_versions = versions; } - #[cfg(feature = "dilation")] - pub fn enable_dilation(&mut self) { - self.can_dilate = Some([std::borrow::Cow::Borrowed("1")]) - } - // pub fn add_resume_ability(&mut self, _resume: ()) { // self.abilities.push("resume-v1".into()) // } @@ -377,16 +357,4 @@ mod test { // None => panic!(), // } // } - - #[test] - #[cfg(dilation)] - fn test_versions_message_can_dilate() { - let mut message = VersionsMessage::new(); - - assert_eq!(message.can_dilate, None); - - message.enable_dilation(); - - assert_eq!(message.can_dilate, Some([std::borrow::Cow::Borrowed("1")])); - } } diff --git a/src/core/protocol.rs b/src/core/protocol.rs deleted file mode 100644 index 0c2ca3cb..00000000 --- a/src/core/protocol.rs +++ /dev/null @@ -1,150 +0,0 @@ -use async_trait::async_trait; -use std::{any::Any, fmt::Debug}; - -#[cfg(test)] -use mockall::automock; - -use crate::{ - core::{ - key::{derive_phase_key, derive_verifier, encrypt_data}, - PhaseProvider, - }, - rendezvous::RendezvousServer, - AppConfig, AppID, Key, Mood, WormholeError, WormholeKey, -}; - -#[derive(Debug)] -pub struct WormholeProtocolDefault { - server: RendezvousServer, - phase: u64, - key: Key, - appid: AppID, - /** - * If you're paranoid, let both sides check that they calculated the same verifier. - * - * PAKE hardens a standard key exchange with a password ("password authenticated") in order - * to mitigate potential man in the middle attacks that would otherwise be possible. Since - * the passwords usually are not of hight entropy, there is a low-probability possible of - * an attacker guessing the password correctly, enabling them to MitM the connection. - * - * Not only is that probability low, but they also have only one try per connection and a failed - * attempts will be noticed by both sides. Nevertheless, comparing the verifier mitigates that - * attack vector. - */ - pub verifier: Box, - /** - * Our "app version" information that we sent. See the [`peer_version`] for more information. - */ - pub our_version: Box, - /** - * Protocol version information from the other side. - * This is bound by the [`AppID`]'s protocol and thus shall be handled on a higher level - * (e.g. by the file transfer API). - */ - pub peer_version: serde_json::Value, -} - -impl WormholeProtocolDefault { - pub fn new( - server: RendezvousServer, - config: AppConfig, - key: Key, - peer_version: serde_json::Value, - ) -> Self - where - T: serde::Serialize + Send + Sync + Sized + 'static, - { - let verifier = Box::new(derive_verifier(&key)); - Self { - server, - appid: config.id, - phase: 0, - key, - verifier, - our_version: Box::new(config.app_version), - peer_version, - } - } -} - -#[async_trait] -impl WormholeProtocol for WormholeProtocolDefault { - /** Send an encrypted message to peer */ - async fn send_with_phase( - &mut self, - plaintext: Vec, - phase_provider: PhaseProvider, - ) -> Result<(), WormholeError> { - let current_phase = phase_provider(self.phase); - self.phase += 1; - let data_key = derive_phase_key(self.server.side(), &self.key, ¤t_phase); - let (_nonce, encrypted) = encrypt_data(&data_key, &plaintext); - self.server - .send_peer_message(current_phase, encrypted) - .await?; - Ok(()) - } - - /** Receive an encrypted message from peer */ - async fn receive(&mut self) -> Result, WormholeError> { - loop { - let peer_message = match self.server.next_peer_message().await? { - Some(peer_message) => peer_message, - None => continue, - }; - - // TODO maybe reorder incoming messages by phase numeral? - let decrypted_message = peer_message - .decrypt(&self.key) - .ok_or(WormholeError::Crypto)?; - - // Send to client - return Ok(decrypted_message); - } - } - - async fn close(&mut self) -> Result<(), WormholeError> { - log::debug!("Closing Wormhole…"); - self.server.shutdown(Mood::Happy).await.map_err(Into::into) - } - - /** - * The `AppID` this wormhole is bound to. - * This determines the upper-layer protocol. Only wormholes with the same value can talk to each other. - */ - fn appid(&self) -> &AppID { - &self.appid - } - - /** - * The symmetric encryption key used by this connection. - * Can be used to derive sub-keys for different purposes. - */ - fn key(&self) -> &Key { - &self.key - } - - fn peer_version(&self) -> &serde_json::Value { - &self.peer_version - } - - fn our_version(&self) -> &Box { - &self.our_version - } -} - -#[cfg_attr(test, automock)] -#[async_trait] -pub trait WormholeProtocol: Debug + Send + Sync { - async fn send_with_phase( - &mut self, - plaintext: Vec, - phase_provider: PhaseProvider, - ) -> Result<(), WormholeError>; - async fn receive(&mut self) -> Result, WormholeError>; - async fn close(&mut self) -> Result<(), WormholeError>; - fn appid(&self) -> &AppID; - fn key(&self) -> &Key; - fn peer_version(&self) -> &serde_json::Value; - fn our_version(&self) -> &Box; -} diff --git a/src/core/rendezvous.rs b/src/core/rendezvous.rs index a1acc183..a107297c 100644 --- a/src/core/rendezvous.rs +++ b/src/core/rendezvous.rs @@ -339,7 +339,7 @@ impl RendezvousServer { appid: &AppID, relay_url: &str, ) -> Result<(Self, Option), RendezvousError> { - let side = MySide::generate(5); + let side = MySide::generate(); let mut connection; #[cfg(not(target_arch = "wasm32"))] @@ -588,31 +588,28 @@ impl RendezvousServer { Ok(()) } - pub async fn shutdown(&mut self, mood: Mood) -> Result<(), RendezvousError> { + pub async fn shutdown(mut self, mood: Mood) -> Result<(), RendezvousError> { if let Some(MailboxMachine { - ref nameplate, - ref mailbox, - ref mut queue, + nameplate, + mailbox, + mut queue, .. }) = self.state { if let Some(nameplate) = nameplate { self.connection - .send_message(&OutboundMessage::release(nameplate.to_owned()), Some(queue)) + .send_message(&OutboundMessage::release(nameplate), Some(&mut queue)) .await?; - match self.connection.receive_reply(Some(queue)).await? { + match self.connection.receive_reply(Some(&mut queue)).await? { RendezvousReply::Released => (), other => return Err(RendezvousError::invalid_message("released", other)), }; } self.connection - .send_message( - &OutboundMessage::close(mailbox.to_owned(), mood), - Some(queue), - ) + .send_message(&OutboundMessage::close(mailbox, mood), Some(&mut queue)) .await?; - match self.connection.receive_reply(Some(queue)).await? { + match self.connection.receive_reply(Some(&mut queue)).await? { RendezvousReply::Closed => (), other => return Err(RendezvousError::invalid_message("closed", other)), }; diff --git a/src/core/server_messages.rs b/src/core/server_messages.rs index ac2f45ce..ac18be27 100644 --- a/src/core/server_messages.rs +++ b/src/core/server_messages.rs @@ -256,6 +256,7 @@ pub enum InboundMessage { #[cfg(test)] mod test { use super::*; + use serde_json::{from_str, json, Value}; #[test] fn test_bind() { @@ -264,10 +265,10 @@ mod test { MySide::unchecked_from_string(String::from("side1")), ); let s = serde_json::to_string(&m1).unwrap(); - let m2: serde_json::Value = serde_json::from_str(&s).unwrap(); + let m2: Value = from_str(&s).unwrap(); assert_eq!( m2, - serde_json::json!({"type": "bind", "appid": "appid", + json!({"type": "bind", "appid": "appid", "side": "side1"}) ); } @@ -276,59 +277,50 @@ mod test { fn test_list() { let m1 = OutboundMessage::List; let s = serde_json::to_string(&m1).unwrap(); - let m2: serde_json::Value = serde_json::from_str(&s).unwrap(); - assert_eq!(m2, serde_json::json!({"type": "list"})); + let m2: Value = from_str(&s).unwrap(); + assert_eq!(m2, json!({"type": "list"})); } #[test] fn test_allocate() { let m1 = OutboundMessage::Allocate; let s = serde_json::to_string(&m1).unwrap(); - let m2: serde_json::Value = serde_json::from_str(&s).unwrap(); - assert_eq!(m2, serde_json::json!({"type": "allocate"})); + let m2: Value = from_str(&s).unwrap(); + assert_eq!(m2, json!({"type": "allocate"})); } #[test] fn test_claim() { let m1 = OutboundMessage::claim("nameplate1"); let s = serde_json::to_string(&m1).unwrap(); - let m2: serde_json::Value = serde_json::from_str(&s).unwrap(); - assert_eq!( - m2, - serde_json::json!({"type": "claim", "nameplate": "nameplate1"}) - ); + let m2: Value = from_str(&s).unwrap(); + assert_eq!(m2, json!({"type": "claim", "nameplate": "nameplate1"})); } #[test] fn test_release() { let m1 = OutboundMessage::release("nameplate1"); let s = serde_json::to_string(&m1).unwrap(); - let m2: serde_json::Value = serde_json::from_str(&s).unwrap(); - assert_eq!( - m2, - serde_json::json!({"type": "release", "nameplate": "nameplate1"}) - ); + let m2: Value = from_str(&s).unwrap(); + assert_eq!(m2, json!({"type": "release", "nameplate": "nameplate1"})); } #[test] fn test_open() { let m1 = OutboundMessage::open(Mailbox(String::from("mailbox1"))); let s = serde_json::to_string(&m1).unwrap(); - let m2: serde_json::Value = serde_json::from_str(&s).unwrap(); - assert_eq!( - m2, - serde_json::json!({"type": "open", "mailbox": "mailbox1"}) - ); + let m2: Value = from_str(&s).unwrap(); + assert_eq!(m2, json!({"type": "open", "mailbox": "mailbox1"})); } #[test] fn test_add() { let m1 = OutboundMessage::add(Phase("phase1".into()), b"body".to_vec()); let s = serde_json::to_string(&m1).unwrap(); - let m2: serde_json::Value = serde_json::from_str(&s).unwrap(); + let m2: Value = from_str(&s).unwrap(); assert_eq!( m2, - serde_json::json!({"type": "add", "phase": "phase1", + json!({"type": "add", "phase": "phase1", "body": "626f6479"}) ); // body is hex-encoded } @@ -337,10 +329,10 @@ mod test { fn test_close() { let m1 = OutboundMessage::close(Mailbox(String::from("mailbox1")), Mood::Happy); let s = serde_json::to_string(&m1).unwrap(); - let m2: serde_json::Value = serde_json::from_str(&s).unwrap(); + let m2: Value = from_str(&s).unwrap(); assert_eq!( m2, - serde_json::json!({"type": "close", "mailbox": "mailbox1", + json!({"type": "close", "mailbox": "mailbox1", "mood": "happy"}) ); } @@ -349,10 +341,10 @@ mod test { fn test_close_errory() { let m1 = OutboundMessage::close(Mailbox(String::from("mailbox1")), Mood::Errory); let s = serde_json::to_string(&m1).unwrap(); - let m2: serde_json::Value = serde_json::from_str(&s).unwrap(); + let m2: Value = from_str(&s).unwrap(); assert_eq!( m2, - serde_json::json!({"type": "close", "mailbox": "mailbox1", + json!({"type": "close", "mailbox": "mailbox1", "mood": "errory"}) ); } @@ -361,10 +353,10 @@ mod test { fn test_close_scared() { let m1 = OutboundMessage::close(Mailbox(String::from("mailbox1")), Mood::Scared); let s = serde_json::to_string(&m1).unwrap(); - let m2: serde_json::Value = serde_json::from_str(&s).unwrap(); + let m2: Value = from_str(&s).unwrap(); assert_eq!( m2, - serde_json::json!({"type": "close", "mailbox": "mailbox1", + json!({"type": "close", "mailbox": "mailbox1", "mood": "scary"}) ); } @@ -433,12 +425,9 @@ mod test { bits: 6, resource: "resource-string".into(), }), - other: [( - "dark-ritual".to_string(), - serde_json::json!({ "hocrux": true }) - )] - .into_iter() - .collect() + other: [("dark-ritual".to_string(), json!({ "hocrux": true }))] + .into_iter() + .collect() }), current_cli_version: None, error: None, diff --git a/src/core/test.rs b/src/core/test.rs index 2d852750..1647a27a 100644 --- a/src/core/test.rs +++ b/src/core/test.rs @@ -18,12 +18,11 @@ pub const APP_CONFIG: AppConfig<()> = AppConfig::<()> { id: TEST_APPID, rendezvous_url: Cow::Borrowed(crate::rendezvous::DEFAULT_RENDEZVOUS_SERVER), app_version: (), - with_dilation: false, }; const TIMEOUT: Duration = Duration::from_secs(60); -pub fn init_logger() { +fn init_logger() { /* Ignore errors from succeedent initialization tries */ let _ = env_logger::builder() .filter_level(log::LevelFilter::Debug) @@ -184,8 +183,7 @@ pub async fn test_file_rust2rust_deprecated() -> eyre::Result<()> { .name("sender".to_owned()) .spawn(async { let (welcome, wormhole_future) = - Wormhole::connect_without_code(transfer::APP_CONFIG.id(TEST_APPID).clone(), 2) - .await?; + Wormhole::connect_without_code(transfer::APP_CONFIG.id(TEST_APPID).clone(), 2).await?; if let Some(welcome) = &welcome.welcome { log::info!("Got welcome: {}", welcome); } @@ -260,8 +258,7 @@ pub async fn test_file_rust2rust() -> eyre::Result<()> { .name("sender".to_owned()) .spawn(async { let mailbox_connection = - MailboxConnection::create(transfer::APP_CONFIG.id(TEST_APPID).clone(), 2) - .await?; + MailboxConnection::create(transfer::APP_CONFIG.id(TEST_APPID).clone(), 2).await?; if let Some(welcome) = &mailbox_connection.welcome { log::info!("Got welcome: {}", welcome); } @@ -407,7 +404,7 @@ pub async fn test_send_many() -> eyre::Result<()> { .await?, ) .await?; - log::info!("Got key: {}", &wormhole.key()); + log::info!("Got key: {}", &wormhole.key); let transfer::ReceiveRequest::V1(req) = crate::transfer::request( wormhole, default_relay_hints(), diff --git a/src/core/wordlist.rs b/src/core/wordlist.rs index 64f8c244..966d82ef 100644 --- a/src/core/wordlist.rs +++ b/src/core/wordlist.rs @@ -1,4 +1,5 @@ use rand::{rngs::OsRng, seq::SliceRandom}; +use serde_json::{self, Value}; use std::fmt; #[derive(PartialEq)] @@ -68,8 +69,7 @@ impl Wordlist { } fn load_pgpwords() -> Vec> { - let raw_words_value: serde_json::Value = - serde_json::from_str(include_str!("pgpwords.json")).unwrap(); + let raw_words_value: Value = serde_json::from_str(include_str!("pgpwords.json")).unwrap(); let raw_words = raw_words_value.as_object().unwrap(); let mut even_words: Vec = Vec::with_capacity(256); even_words.resize(256, String::from("")); diff --git a/src/dilated_transfer/mod.rs b/src/dilated_transfer/mod.rs deleted file mode 100644 index 7bb54d66..00000000 --- a/src/dilated_transfer/mod.rs +++ /dev/null @@ -1,77 +0,0 @@ -use crate::{core::APPID_RAW, AppID}; -use std::borrow::Cow; - -pub const APP_CONFIG: crate::AppConfig = crate::AppConfig:: { - id: AppID(Cow::Borrowed(APPID_RAW)), - rendezvous_url: Cow::Borrowed(crate::rendezvous::DEFAULT_RENDEZVOUS_SERVER), - app_version: AppVersion::new(Some(FileTransferV2Mode::Send)), - with_dilation: false, -}; - -#[derive(Clone, serde_derive::Serialize, serde_derive::Deserialize)] -#[serde(rename_all = "kebab-case")] -#[serde(rename = "transfer")] -pub enum FileTransferV2Mode { - Send, - Receive, - Connect, -} - -#[derive(Clone, serde_derive::Serialize, serde_derive::Deserialize)] -#[serde(rename_all = "kebab-case")] -struct DilatedTransfer { - mode: FileTransferV2Mode, -} - -#[derive(Clone, serde_derive::Serialize, serde_derive::Deserialize)] -#[serde(rename_all = "kebab-case")] -pub struct AppVersion { - // #[serde(default)] - // abilities: Cow<'static, [Cow<'static, str>]>, - // #[serde(default)] - // transfer_v2: Option, - - // XXX: we don't want to send "can-dilate" key for non-dilated - // wormhole, would making this an Option help? i.e. when the value - // is a None, we don't serialize that into the json and do it only - // when it is a "Some" value? - // overall versions payload is of the form: - // b'{"can-dilate": ["1"], "dilation-abilities": [{"type": "direct-tcp-v1"}, {"type": "relay-v1"}], "app_versions": {"transfer": {"mode": "send", "features": {}}}}' - - //can_dilate: Option<[Cow<'static, str>; 1]>, - //dilation_abilities: Cow<'static, [Ability; 2]>, - #[serde(rename = "transfer")] - app_versions: Option, -} - -impl AppVersion { - pub const fn new(mode: Option) -> Self { - // let can_dilate: Option<[Cow<'static, str>; 1]> = if enable_dilation { - // Some([std::borrow::Cow::Borrowed("1")]) - // } else { - // None - // }; - - let option = match mode { - Some(mode) => Some(DilatedTransfer { mode }), - None => None, - }; - - Self { - // abilities: Cow::Borrowed([Cow::Borrowed("transfer-v1"), Cow::Borrowed("transfer-v2")]), - // transfer_v2: Some(AppVersionTransferV2Hint::new()) - // can_dilate: can_dilate, - // dilation_abilities: std::borrow::Cow::Borrowed(&[ - // Ability{ ty: std::borrow::Cow::Borrowed("direct-tcp-v1") }, - // Ability{ ty: std::borrow::Cow::Borrowed("relay-v1") }, - // ]), - app_versions: option, - } - } -} - -impl Default for AppVersion { - fn default() -> Self { - Self::new(Some(FileTransferV2Mode::Send)) - } -} diff --git a/src/dilation/api.rs b/src/dilation/api.rs deleted file mode 100644 index e097a6a5..00000000 --- a/src/dilation/api.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::core::MySide; -use derive_more::Display; -use serde_derive::{Deserialize, Serialize}; - -// from IO to DilationCore -#[derive(Debug, Clone, PartialEq, Display, Deserialize)] -pub enum IOEvent { - WormholeMessageReceived(String), - TCPConnectionLost, - TCPConnectionMade, -} - -/// Commands to be executed -#[derive(Debug, Clone, PartialEq, Display)] -#[allow(dead_code)] -pub enum ManagerCommand { - // XXX: include API calls to IO layer - Protocol(ProtocolCommand), - IO(IOCommand), -} - -/// Protocol level commands -#[derive(Debug, Clone, PartialEq, Display, Serialize)] -#[serde(tag = "type")] -pub enum ProtocolCommand { - #[serde(rename = "please")] - SendPlease { side: MySide }, -} - -/// Protocol level commands -#[derive(Debug, Clone, PartialEq, Display)] -#[allow(dead_code)] -pub enum IOCommand { - CloseConnection, -} diff --git a/src/dilation/events.rs b/src/dilation/events.rs deleted file mode 100644 index a465b97c..00000000 --- a/src/dilation/events.rs +++ /dev/null @@ -1,89 +0,0 @@ -use derive_more::Display; -use serde_derive::Deserialize; - -use crate::{ - core::TheirSide, - dilation::api::{IOEvent, ManagerCommand}, - transit::Hints, -}; - -use super::api::ProtocolCommand; - -#[derive(Debug, Clone, PartialEq, Deserialize)] -pub enum Event { - //IO(IOAction), - // All state machine events - Manager(ManagerEvent), - Connection(IOEvent), -} - -impl From for ManagerCommand { - fn from(r: ProtocolCommand) -> ManagerCommand { - ManagerCommand::Protocol(r) - } -} - -impl From for Event { - fn from(r: ManagerEvent) -> Event { - Event::Manager(r) - } -} - -// individual fsm events -#[derive(Display, Debug, Clone, PartialEq, Deserialize)] -#[serde(tag = "type")] -pub enum ManagerEvent { - #[serde(rename = "start")] - Start, - #[serde(rename = "please")] - RxPlease { - side: TheirSide, - }, - #[serde(rename = "connection-hints")] - RxHints { - hints: Hints, - }, - RxReconnect, - RxReconnecting, - ConnectionMade, - ConnectionLostLeader, - ConnectionLostFollower, - Stop, -} - -// XXX: for Connector fsm events -// ... -// XXX - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_display_please_event() { - let event = ManagerEvent::RxPlease { - side: TheirSide::from("f91dcdaccc7cc336"), - }; - assert_eq!(format!("{}", event), "TheirSide(f91dcdaccc7cc336)"); - } - - #[test] - fn test_manager_event_deserialisation_start() { - let result: ManagerEvent = - serde_json::from_str(r#"{"type": "start"}"#).expect("parse error"); - assert_eq!(result, ManagerEvent::Start); - } - - #[test] - fn test_manager_event_deserialisation_rxplease() { - let result: ManagerEvent = - serde_json::from_str(r#"{"type": "please", "side": "f91dcdaccc7cc336"}"#) - .expect("parse error"); - assert_eq!( - result, - ManagerEvent::RxPlease { - side: TheirSide::from("f91dcdaccc7cc336") - } - ); - } -} diff --git a/src/dilation/manager.rs b/src/dilation/manager.rs deleted file mode 100644 index 40063fc0..00000000 --- a/src/dilation/manager.rs +++ /dev/null @@ -1,270 +0,0 @@ -use derive_more::Display; -#[cfg(test)] -use mockall::automock; - -use crate::{ - core::{MySide, TheirSide}, - dilation::api::ManagerCommand, - WormholeError, -}; - -use super::{api::ProtocolCommand, events::ManagerEvent}; - -#[derive(Debug, PartialEq, Display)] -pub enum Role { - Leader, - Follower, -} - -#[derive(Debug, PartialEq, Clone, Copy, Display)] -#[allow(dead_code)] -pub enum State { - Waiting, - Wanting, - Connecting, - Connected, - Abandoning, - Flushing, - Lonely, - Stopping, - Stopped, -} - -pub struct ManagerMachine { - pub side: MySide, - pub role: Role, - pub state: Option, -} - -#[cfg_attr(test, automock)] -impl ManagerMachine { - pub fn new(side: MySide) -> Self { - ManagerMachine { - side, - role: Role::Follower, - state: Some(State::Wanting), - } - } - - pub fn current_state(&self) -> Option { - self.state - } - - fn choose_role(&self, theirside: &TheirSide) -> Role { - let myside: TheirSide = self.side.clone().into(); - if myside > *theirside { - Role::Leader - } else { - Role::Follower - } - } - - pub fn process( - &mut self, - event: ManagerEvent, - side: &MySide, - command_handler: &mut dyn FnMut(ManagerCommand) -> Result<(), WormholeError>, - ) { - log::debug!( - "processing event: state={}, event={}", - self.state.unwrap(), - &event - ); - // given the event and the current state, generate output - // event and move to the new state - use State::*; - let mut command = None; - let current_state = self.state.unwrap(); - let new_state = match current_state { - Waiting => match event { - ManagerEvent::Start => { - command = Some(ManagerCommand::from(ProtocolCommand::SendPlease { - side: side.clone(), - })); - Wanting - }, - ManagerEvent::Stop => { - // actions.addAction(NotifyStopped) - Stopped - }, - _ => { - panic! {"unexpected event {:?} for state {:?}", current_state, event} - }, - }, - Wanting => match event { - ManagerEvent::RxPlease { side: their_side } => { - command = Some(ManagerCommand::from(ProtocolCommand::SendPlease { - side: side.clone(), - })); - let role = self.choose_role(&their_side.clone()); - log::debug!( - "role: {}", - if role == Role::Leader { - "leader" - } else { - "follower" - } - ); - self.role = role; - Connecting - }, - ManagerEvent::Stop => Stopped, - ManagerEvent::RxHints { hints: _ } => current_state, - _ => { - panic! {"unexpected event {:?} for state {:?}", current_state, event} - }, - }, - Connecting => match event { - ManagerEvent::RxHints { hints } => { - log::debug!("received connection hints: {:?}", hints); - // TODO store the other side's hints - current_state - }, - ManagerEvent::Stop => Stopped, - ManagerEvent::ConnectionMade => Connected, - ManagerEvent::RxReconnect => current_state, - _ => { - panic! {"unexpected event {:?} for state {:?}", current_state, event} - }, - }, - Connected => match event { - ManagerEvent::RxReconnect => Abandoning, - ManagerEvent::RxHints { hints: _ } => current_state, - ManagerEvent::ConnectionLostFollower => Lonely, - ManagerEvent::ConnectionLostLeader => Flushing, - ManagerEvent::Stop => Stopped, - _ => { - panic! {"unexpected event {:?} for state {:?}", current_state, event} - }, - }, - Abandoning => match event { - ManagerEvent::RxHints { hints: _ } => current_state, - ManagerEvent::ConnectionLostFollower => Connecting, - ManagerEvent::Stop => Stopped, - _ => { - panic! {"unexpected event {:?} for state {:?}", current_state, event} - }, - }, - Flushing => match event { - ManagerEvent::RxReconnecting => Connecting, - ManagerEvent::Stop => Stopped, - ManagerEvent::RxHints { hints: _ } => current_state, - _ => { - panic! {"unexpected event {:?} for state {:?}", current_state, event} - }, - }, - Lonely => match event { - ManagerEvent::RxReconnect => Connecting, - ManagerEvent::Stop => Stopped, - ManagerEvent::RxHints { hints: _ } => current_state, - _ => { - panic! {"unexpected event {:?} for state {:?}", current_state, event} - }, - }, - Stopping => match event { - ManagerEvent::RxHints { hints: _ } => current_state, - ManagerEvent::ConnectionLostFollower => Stopped, - ManagerEvent::ConnectionLostLeader => Stopped, - _ => { - panic! {"unexpected event {:?} for state {:?}", current_state, event} - }, - }, - Stopped => current_state, - }; - - let command_result = match command.clone() { - Some(command) => command_handler(command), - None => Ok(()), - }; - - match command_result { - Ok(_result) => { - self.state = Some(new_state); - log::debug!( - "processing event finished: state={}, command={}", - self.state.unwrap(), - command - .clone() - .map(|cmd| cmd.to_string()) - .unwrap_or("n/a".to_string()) - ); - }, - Err(wormhole_error) => { - panic!("processing event errored: {}", wormhole_error); - }, - }; - } - - pub(crate) fn is_done(&self) -> bool { - self.state == Option::from(State::Stopped) - } -} - -#[cfg(test)] -mod test { - use crate::core::{MySide, TheirSide}; - - use super::*; - - struct TestHandler { - command: Option, - } - - impl TestHandler { - fn new() -> Self { - TestHandler { command: None } - } - - fn handle_command(&mut self, command: ManagerCommand) -> Result<(), WormholeError> { - self.command = Some(command); - Ok(()) - } - } - - #[test] - fn test_manager_machine() { - // Sends Start event during construction: - let mut manager_fsm = - ManagerMachine::new(MySide::unchecked_from_string("test123".to_string())); - let side = MySide::generate(8); - - assert_eq!(manager_fsm.current_state(), Some(State::Wanting)); - assert_eq!(manager_fsm.is_done(), false); - - let mut handler = TestHandler::new(); - - // generate an input Event and see if we get the desired state and output Actions - manager_fsm.process( - ManagerEvent::RxPlease { - side: TheirSide::from("test"), - }, - &side, - &mut |cmd| handler.handle_command(cmd), - ); - - assert_eq!(manager_fsm.current_state(), Some(State::Connecting)); - assert_eq!( - handler.command, - Some(ManagerCommand::Protocol(ProtocolCommand::SendPlease { - side: side, - })) - ) - } - - #[test] - #[should_panic(expected = "Protocol error: foo")] - fn test_manager_machine_handle_error() { - let side = MySide::generate(8); - let mut manager_fsm = ManagerMachine { - side: side.clone(), - role: Role::Follower, - state: Some(State::Waiting), - }; - - assert_eq!(manager_fsm.current_state(), Some(State::Waiting)); - - manager_fsm.process(ManagerEvent::Start, &side, &mut |_cmd| { - Err(WormholeError::Protocol("foo".into())) - }); - } -} diff --git a/src/dilation/mod.rs b/src/dilation/mod.rs deleted file mode 100644 index f79caf41..00000000 --- a/src/dilation/mod.rs +++ /dev/null @@ -1,419 +0,0 @@ -use std::{cell::RefCell, rc::Rc}; - -use futures::executor; - -use crate::{ - core::{MySide, Phase}, - dilation::api::{ManagerCommand, ProtocolCommand}, - Wormhole, WormholeError, -}; - -#[cfg(test)] -use crate::core::protocol::MockWormholeProtocol; - -#[mockall_double::double] -use crate::dilation::manager::ManagerMachine; - -mod api; -mod events; -mod manager; - -#[mockall_double::double] -type WormholeConnection = WormholeConnectionDefault; - -pub struct WormholeConnectionDefault { - wormhole: Rc>, -} - -#[cfg_attr(test, mockall::automock)] -impl WormholeConnectionDefault { - fn new(wormhole: Wormhole) -> Self { - Self { - wormhole: Rc::new(RefCell::new(wormhole)), - } - } - - async fn receive_json(&self) -> Result - where - T: for<'a> serde::Deserialize<'a> + 'static, - { - let message = self.wormhole.borrow_mut().receive_json().await; - match message { - Ok(result) => match result { - Ok(result) => Ok(result), - Err(error) => Err(WormholeError::ProtocolJson(error)), - }, - Err(error) => Err(error), - } - } - - async fn send_json(&self, command: &ProtocolCommand) -> Result<(), WormholeError> { - self.wormhole - .borrow_mut() - .send_json_with_phase(command, Phase::dilation) - .await - } -} - -pub struct DilatedWormhole { - wormhole: WormholeConnection, - side: MySide, - manager: ManagerMachine, -} - -impl DilatedWormhole { - pub fn new(wormhole: Wormhole, side: MySide) -> Self { - DilatedWormhole { - wormhole: WormholeConnection::new(wormhole), - side: side.clone(), - manager: ManagerMachine::new(side.clone()), - } - } - - pub async fn run(&mut self) { - log::info!( - "start state machine: state={}", - &self.manager.current_state().unwrap() - ); - - let mut command_handler = |cmd| Self::execute_command(&self.wormhole, cmd); - - loop { - log::debug!("wait for next event"); - let event_result = self.wormhole.receive_json().await; - - match event_result { - Ok(manager_event) => { - log::debug!("received event"); - self.manager - .process(manager_event, &self.side, &mut command_handler) - }, - Err(error) => { - log::warn!("received error {}", error); - continue; - }, - }; - - if self.manager.is_done() { - log::debug!("exiting"); - break; - } - } - } - - fn execute_command( - wormhole: &WormholeConnection, - command: ManagerCommand, - ) -> Result<(), WormholeError> { - log::debug!("execute_command"); - match command { - ManagerCommand::Protocol(protocol_command) => { - log::debug!(" command: {}", protocol_command); - executor::block_on(wormhole.send_json(&protocol_command)) - }, - ManagerCommand::IO(io_command) => { - println!("io command: {}", io_command); - Ok(()) - }, - } - } -} - -#[cfg(test)] -mod test { - use crate::{ - core::test::init_logger, - dilation::{ - api::{IOCommand, ProtocolCommand}, - events::ManagerEvent, - manager::{MockManagerMachine, State}, - }, - }; - use std::sync::{Arc, Mutex}; - - use super::*; - - use mockall::predicate::{always, eq}; - - #[async_std::test] - async fn test_wormhole_connection_send() { - let mut protocol = MockWormholeProtocol::default(); - let command = ProtocolCommand::SendPlease { - side: MySide::generate(2), - }; - - let serialized_bytes = serde_json::to_vec(&command).unwrap(); - - protocol - .expect_send_with_phase() - .withf(move |bytes, provider| { - bytes == &serialized_bytes && provider(0) == Phase::dilation(0) - }) - .return_once(|_, _| Ok(())); - - let connection = WormholeConnectionDefault::new(Wormhole::new(Box::new(protocol))); - - let result = connection.send_json(&command).await; - - assert!(result.is_ok()) - } - - #[async_std::test] - async fn test_wormhole_connection_send_error() { - let mut protocol = MockWormholeProtocol::default(); - let command = ProtocolCommand::SendPlease { - side: MySide::generate(2), - }; - - protocol - .expect_send_with_phase() - .return_once(|_, _| Err(WormholeError::Protocol(Box::from("foo")))); - - let connection = WormholeConnectionDefault::new(Wormhole::new(Box::new(protocol))); - - let result = connection.send_json(&command).await; - - assert!(result.is_err()) - } - - #[async_std::test] - async fn test_wormhole_connection_receive() { - let mut protocol = MockWormholeProtocol::default(); - - let serialized_bytes = r#"{"type": "start"}"#.as_bytes().to_vec(); - - protocol - .expect_receive() - .return_once(|| Ok(serialized_bytes)); - - let connection = WormholeConnectionDefault::new(Wormhole::new(Box::new(protocol))); - - let result = connection.receive_json::().await; - - assert!(result.is_ok()) - } - - #[async_std::test] - async fn test_wormhole_connection_receive_error() { - let mut protocol = MockWormholeProtocol::default(); - - protocol - .expect_receive() - .return_once(|| Err(WormholeError::Protocol(Box::from("foo")))); - - let connection = WormholeConnectionDefault::new(Wormhole::new(Box::new(protocol))); - - let result = connection.receive_json::().await; - - assert!(result.is_err()) - } - - #[async_std::test] - async fn test_wormhole_connection_receive_deserialization_error() { - let mut protocol = MockWormholeProtocol::default(); - - let serialized_bytes = r#"{"type": "foo"}"#.as_bytes().to_vec(); - - protocol - .expect_receive() - .return_once(|| Ok(serialized_bytes)); - - let connection = WormholeConnectionDefault::new(Wormhole::new(Box::new(protocol))); - - let result = connection.receive_json::().await; - - assert!(result.is_err()) - } - - #[async_std::test] - async fn test_dilated_wormhole_new() { - let wc_ctx = MockWormholeConnectionDefault::new_context(); - wc_ctx - .expect() - .with(always()) - .return_once(move |_| WormholeConnection::default()); - - let mm_ctx = MockManagerMachine::new_context(); - mm_ctx - .expect() - .with(always()) - .return_once(move |_| ManagerMachine::default()); - } - - #[async_std::test] - async fn test_dilated_wormhole() { - init_logger(); - - let mut manager = ManagerMachine::default(); - let mut wormhole = WormholeConnection::default(); - - let my_side = MySide::generate(23); - - manager - .expect_current_state() - .return_once(|| Some(State::Wanting)); - - wormhole - .expect_receive_json() - .return_once(|| Ok(ManagerEvent::Start)); - - manager - .expect_process() - .with(eq(ManagerEvent::Start), eq(my_side.clone()), always()) - .times(1) - .return_once(|_, _, _| ()); - - manager.expect_is_done().return_once(|| true); - - let mut dilated_wormhole = DilatedWormhole { - manager, - side: my_side, - wormhole, - }; - - dilated_wormhole.run().await; - } - - #[async_std::test] - async fn test_dilated_wormhole_receving_error() { - init_logger(); - - let mut manager = ManagerMachine::default(); - let mut wormhole = WormholeConnection::default(); - - let my_side = MySide::generate(23); - - manager - .expect_current_state() - .return_once(|| Some(State::Wanting)); - - let mut events = vec![Ok(ManagerEvent::Start), Err(WormholeError::DilationVersion)]; - wormhole - .expect_receive_json() - .returning(move || events.pop().unwrap()); - - manager - .expect_process() - .with(eq(ManagerEvent::Start), eq(my_side.clone()), always()) - .times(1) - .return_once(|_, _, _| ()); - - manager.expect_is_done().return_once(|| true); - - let mut dilated_wormhole = DilatedWormhole { - manager, - side: my_side, - wormhole, - }; - - dilated_wormhole.run().await; - } - - #[async_std::test] - async fn test_dilated_wormhole_two_iterations() { - init_logger(); - - let mut manager = ManagerMachine::default(); - let mut wormhole = WormholeConnection::default(); - - let my_side = MySide::generate(23); - - manager - .expect_current_state() - .return_once(|| Some(State::Wanting)); - - let mut events = vec![Ok(ManagerEvent::Stop), Ok(ManagerEvent::Start)]; - wormhole - .expect_receive_json() - .times(2) - .returning(move || events.pop().unwrap()); - - let verify_events = Arc::new(Mutex::new(vec![ManagerEvent::Stop, ManagerEvent::Start])); - let verify_my_side = my_side.clone(); - manager - .expect_process() - .withf(move |event, side, _| { - *event == verify_events.lock().unwrap().pop().unwrap() && side == &verify_my_side - }) - .times(2) - .returning(|_, _, _| ()); - - let mut returns = vec![true, false]; - manager - .expect_is_done() - .returning(move || returns.pop().unwrap()); - - let mut dilated_wormhole = DilatedWormhole { - manager, - side: my_side.clone(), - wormhole, - }; - - dilated_wormhole.run().await; - } - - #[test] - fn test_dilated_wormhole_execute_protocol_command() { - init_logger(); - - let mut wormhole = WormholeConnection::default(); - - let protocol_command = ProtocolCommand::SendPlease { - side: MySide::generate(2), - }; - - wormhole - .expect_send_json() - .with(eq(protocol_command.clone())) - .return_once(|_| Ok(())) - .times(1); - - let result = DilatedWormhole::execute_command( - &mut wormhole, - ManagerCommand::Protocol(protocol_command), - ); - - assert!(result.is_ok()) - } - - #[test] - fn test_dilated_wormhole_execute_protocol_command_failure() { - init_logger(); - - let mut wormhole = WormholeConnection::default(); - - let protocol_command = ProtocolCommand::SendPlease { - side: MySide::generate(2), - }; - - let protocol_command_ref = protocol_command.clone(); - wormhole - .expect_send_json() - .with(eq(protocol_command_ref)) - .return_once(|_| Err(WormholeError::Crypto)) - .times(1); - - let result = DilatedWormhole::execute_command( - &mut wormhole, - ManagerCommand::Protocol(protocol_command.clone()), - ); - - assert!(result.is_err()) - } - - #[test] - fn test_dilated_wormhole_execute_io_command() { - init_logger(); - - let mut wormhole = WormholeConnection::default(); - - wormhole.expect_send_json().times(0); - - let result = DilatedWormhole::execute_command( - &mut wormhole, - ManagerCommand::IO(IOCommand::CloseConnection), - ); - - assert!(result.is_ok()) - } -} diff --git a/src/forwarding.rs b/src/forwarding.rs index 51892ea6..03dc3a8e 100644 --- a/src/forwarding.rs +++ b/src/forwarding.rs @@ -13,21 +13,18 @@ //! and received as they come in, no additional buffering is applied. (Under the assumption that those applications //! that need buffering already do it on their side, and those who don't, don't.) +use super::*; +use async_std::net::{TcpListener, TcpStream}; +use futures::{AsyncReadExt, AsyncWriteExt, Future, SinkExt, StreamExt, TryStreamExt}; +use serde::{Deserialize, Serialize}; use std::{ borrow::Cow, collections::{HashMap, HashSet}, rc::Rc, sync::Arc, }; - -use async_std::net::{TcpListener, TcpStream}; -use futures::{AsyncReadExt, AsyncWriteExt, Future, SinkExt, StreamExt, TryStreamExt}; -use serde::{Deserialize, Serialize}; - use transit::{TransitConnectError, TransitError}; -use super::*; - const APPID_RAW: &str = "piegames.de/wormhole/port-forwarding"; /// The App ID associated with this protocol. @@ -44,7 +41,6 @@ pub const APP_CONFIG: crate::AppConfig = crate::AppConfig::, u16)>, cancel: impl Future, ) -> Result<(), ForwardingError> { - let our_version = wormhole - .our_version() - .downcast_ref::() - .expect("You may only use a Wormhole instance with the correct AppVersion type!") - .to_owned(); - let peer_version: AppVersion = serde_json::from_value(wormhole.peer_version().to_owned())?; + let our_version: &AppVersion = wormhole + .our_version + .downcast_ref() + .expect("You may only use a Wormhole instance with the correct AppVersion type!"); + let peer_version: AppVersion = serde_json::from_value(wormhole.peer_version.clone())?; let connector = transit::init( our_version.transit_abilities, Some(peer_version.transit_abilities), relay_hints, ) .await?; + /* Send our transit hints */ wormhole .send_json(&PeerMessage::Transit { @@ -171,7 +167,7 @@ pub async fn serve( log::warn!("It seems like you are trying to forward a remote HTTP target ('{}'). Due to HTTP being host-aware this will very likely fail!", host); } (format!("{}:{}", host, port), (Some(host), port)) - } + }, None => (port.to_string(), (host, port)), }) .collect(); @@ -528,10 +524,10 @@ pub async fn connect( custom_ports: &[u16], ) -> Result { let our_version: &AppVersion = wormhole - .our_version() + .our_version .downcast_ref() .expect("You may only use a Wormhole instance with the correct AppVersion type!"); - let peer_version: AppVersion = serde_json::from_value(wormhole.peer_version().to_owned())?; + let peer_version: AppVersion = serde_json::from_value(wormhole.peer_version.clone())?; let connector = transit::init( our_version.transit_abilities, Some(peer_version.transit_abilities), diff --git a/src/lib.rs b/src/lib.rs index 969731a0..28864a26 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,10 +27,6 @@ #[macro_use] mod util; mod core; -#[cfg(feature = "dilation")] -pub mod dilated_transfer; -#[cfg(feature = "dilation")] -pub mod dilation; #[cfg(feature = "forwarding")] pub mod forwarding; #[cfg(feature = "transfer")] @@ -45,6 +41,3 @@ pub use crate::core::{ rendezvous, AppConfig, AppID, Code, MailboxConnection, Mood, Nameplate, Wormhole, WormholeError, }; - -#[cfg(feature = "dilation")] -pub use crate::dilation::DilatedWormhole; diff --git a/src/transfer.rs b/src/transfer.rs index 064f0cf2..64fd547e 100644 --- a/src/transfer.rs +++ b/src/transfer.rs @@ -33,7 +33,7 @@ mod v2; pub use v1::ReceiveRequest as ReceiveRequestV1; pub use v2::ReceiveRequest as ReceiveRequestV2; -use crate::core::APPID_RAW; +const APPID_RAW: &str = "lothar.com/wormhole/text-or-file-xfer"; /// The App ID associated with this protocol. pub const APPID: AppID = AppID(Cow::Borrowed(APPID_RAW)); @@ -46,7 +46,6 @@ pub const APP_CONFIG: crate::AppConfig = crate::AppConfig::>, -// transit_abilities: Vec, -// } - -// impl AppVersionTransferV2Hint { -// const fn new() -> Self { -// Self { -// supported_formats: vec![Cow::Borrowed("tar.zst")], -// transit_abilities: Ability::all_abilities(), -// } -// } -// } - -// impl Default for AppVersionTransferV2Hint { -// fn default() -> Self { -// Self::new() -// } -// } - -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct AppVersionTransferV2Hint { supported_formats: Cow<'static, [Cow<'static, str>]>, @@ -829,7 +806,7 @@ pub async fn send( progress_handler: impl FnMut(u64, u64) + 'static, cancel: impl Future, ) -> Result<(), TransferError> { - let peer_version: AppVersion = serde_json::from_value(wormhole.peer_version().clone())?; + let peer_version: AppVersion = serde_json::from_value(wormhole.peer_version.clone())?; if peer_version.supports_v2() { v2::send( wormhole, @@ -870,7 +847,7 @@ pub async fn request( transit_abilities: transit::Abilities, cancel: impl Future, ) -> Result, TransferError> { - let peer_version: AppVersion = serde_json::from_value(wormhole.peer_version().clone())?; + let peer_version: AppVersion = serde_json::from_value(wormhole.peer_version.clone())?; if peer_version.supports_v2() { v2::request( wormhole, diff --git a/src/transfer/cancel.rs b/src/transfer/cancel.rs index 5199fff4..10a75219 100644 --- a/src/transfer/cancel.rs +++ b/src/transfer/cancel.rs @@ -103,7 +103,7 @@ pub async fn handle_run_result( result: Result<(Result<(), TransferError>, impl Future), Cancelled>, ) -> Result<(), TransferError> { match handle_run_result_noclose(wormhole, result).await { - Ok(Some(((), mut wormhole, cancel))) => { + Ok(Some(((), wormhole, cancel))) => { /* Happy case: everything went okay. Now close the wormholhe */ log::debug!("Transfer done, doing cleanup logic"); wrap_timeout( diff --git a/src/transfer/v2.rs b/src/transfer/v2.rs index bfa6123d..f4fc4f88 100644 --- a/src/transfer/v2.rs +++ b/src/transfer/v2.rs @@ -163,7 +163,7 @@ pub async fn send( futures::pin_mut!(cancel); /* Establish transit connection, close the Wormhole and switch to using the transit connection (msgpack instead of json) */ - let (mut transit, mut wormhole, cancel) = cancel::with_cancel_wormhole!( + let (mut transit, wormhole, cancel) = cancel::with_cancel_wormhole!( wormhole, run = async { Ok(make_transit( @@ -343,7 +343,7 @@ pub async fn request( futures::pin_mut!(cancel); /* Establish transit connection, close the Wormhole and switch to using the transit connection (msgpack instead of json) */ - let ((mut transit, info), mut wormhole, cancel) = cancel::with_cancel_wormhole!( + let ((mut transit, info), wormhole, cancel) = cancel::with_cancel_wormhole!( wormhole, run = async { make_transit( diff --git a/src/transit.rs b/src/transit.rs index f414f465..33391735 100644 --- a/src/transit.rs +++ b/src/transit.rs @@ -14,7 +14,6 @@ //! "leader" side and one "follower" side (formerly called "sender" and "receiver"). use crate::{util, Key, KeyPurpose}; -use derive_more::Display; use serde_derive::{Deserialize, Serialize}; #[cfg(not(target_family = "wasm"))] @@ -219,6 +218,18 @@ impl<'de> serde::Deserialize<'de> for Abilities { where D: serde::Deserializer<'de>, { + #[derive(Deserialize)] + #[serde(rename_all = "kebab-case", tag = "type")] + enum Ability { + DirectTcpV1, + RelayV1, + RelayV2, + #[cfg(all())] + NoiseCryptoV1, + #[serde(other)] + Other, + } + let mut abilities = Self::default(); /* Specifying a hint multiple times is undefined behavior. Here, we simply merge all features. */ for ability in as serde::Deserialize>::deserialize(de)? { @@ -252,8 +263,7 @@ enum HintSerde { } /** Information about how to find a peer */ -#[derive(Clone, Display, Debug, Default, PartialEq)] -#[display(fmt = "Hints(direct: {:?}, relay: {:?})", "&direct_tcp", "&relay")] +#[derive(Clone, Debug, Default)] pub struct Hints { /** Hints for direct connection */ pub direct_tcp: HashSet, @@ -545,8 +555,6 @@ impl<'de> serde::Deserialize<'de> for RelayHint { } } -use crate::core::Ability; - impl TryFrom<&DirectHint> for IpAddr { type Error = std::net::AddrParseError; fn try_from(hint: &DirectHint) -> Result {