diff --git a/Cargo.lock b/Cargo.lock index 2ebedf5d..dcae111d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2581,6 +2581,8 @@ dependencies = [ "libp2p-swarm", "libp2p-tcp", "libp2p-upnp", + "libp2p-websocket", + "libp2p-websocket-websys", "libp2p-webtransport-websys", "libp2p-yamux", "multiaddr", @@ -2982,6 +2984,42 @@ dependencies = [ "void", ] +[[package]] +name = "libp2p-websocket" +version = "0.43.1" +source = "git+https://github.com/libp2p/rust-libp2p?rev=60e32c9d3b4bee42ac698791e917e1587eaa1388#60e32c9d3b4bee42ac698791e917e1587eaa1388" +dependencies = [ + "either", + "futures", + "futures-rustls", + "libp2p-core", + "libp2p-identity", + "parking_lot", + "pin-project-lite", + "rw-stream-sink", + "soketto", + "tracing", + "url", + "webpki-roots 0.25.4", +] + +[[package]] +name = "libp2p-websocket-websys" +version = "0.3.2" +source = "git+https://github.com/libp2p/rust-libp2p?rev=60e32c9d3b4bee42ac698791e917e1587eaa1388#60e32c9d3b4bee42ac698791e917e1587eaa1388" +dependencies = [ + "bytes", + "futures", + "js-sys", + "libp2p-core", + "parking_lot", + "send_wrapper 0.6.0", + "thiserror", + "tracing", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "libp2p-webtransport-websys" version = "0.3.0" @@ -3133,6 +3171,8 @@ dependencies = [ "redb", "rexie", "rstest", + "rustls-pemfile", + "rustls-pki-types", "send_wrapper 0.6.0", "serde", "serde-wasm-bindgen", @@ -4449,7 +4489,7 @@ dependencies = [ "rustls-webpki 0.102.4", "security-framework", "security-framework-sys", - "webpki-roots", + "webpki-roots 0.26.2", "winapi", ] @@ -5565,6 +5605,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "webpki-roots" version = "0.26.2" diff --git a/node-wasm/Cargo.toml b/node-wasm/Cargo.toml index 82f3fb54..4be729b5 100644 --- a/node-wasm/Cargo.toml +++ b/node-wasm/Cargo.toml @@ -50,7 +50,6 @@ wasm-bindgen = "0.2.92" wasm-bindgen-futures = "0.4.42" web-sys = { version = "0.3.69", features = [ "BroadcastChannel", - "Crypto", "DedicatedWorkerGlobalScope", "Headers", "MessageEvent", @@ -66,6 +65,7 @@ web-sys = { version = "0.3.69", features = [ "Window", "Worker", "WorkerGlobalScope", + "WorkerNavigator", "WorkerOptions", "WorkerType", ] } diff --git a/node-wasm/src/node.rs b/node-wasm/src/node.rs index 52f6a361..8f65a7a7 100644 --- a/node-wasm/src/node.rs +++ b/node-wasm/src/node.rs @@ -15,8 +15,8 @@ use lumina_node::store::IndexedDbStore; use crate::error::{Context, Result}; use crate::utils::{ - is_chrome, js_value_from_display, request_storage_persistence, resolve_dnsaddr_multiaddress, - Network, + is_safari, js_value_from_display, request_storage_persistence, resolve_dnsaddr_multiaddress, + shared_workers_supported, Network, }; use crate::worker::commands::{CheckableResponseExt, NodeCommand, SingleHeaderQuery}; use crate::worker::{AnyWorker, WorkerClient}; @@ -92,17 +92,17 @@ impl NodeDriver { worker_script_url: &str, worker_type: Option, ) -> Result { - if let Err(e) = request_storage_persistence().await { - error!("Error requesting storage persistence: {e}"); + // Safari doesn't have the `navigator.storage()` api + if !is_safari()? { + if let Err(e) = request_storage_persistence().await { + error!("Error requesting storage persistence: {e}"); + } } - // For chrome we default to running in a dedicated Worker because: - // 1. Chrome Android does not support SharedWorkers at all - // 2. On desktop Chrome, restarting Lumina's worker causes all network connections to fail. - let default_worker_type = if is_chrome().unwrap_or(false) { - NodeWorkerKind::Dedicated - } else { + let default_worker_type = if shared_workers_supported().unwrap_or(false) { NodeWorkerKind::Shared + } else { + NodeWorkerKind::Dedicated }; let worker = AnyWorker::new( diff --git a/node-wasm/src/utils.rs b/node-wasm/src/utils.rs index 3c41180b..a94d919a 100644 --- a/node-wasm/src/utils.rs +++ b/node-wasm/src/utils.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use std::fmt::{self, Debug}; use std::net::{IpAddr, Ipv4Addr}; +use js_sys::Math; use libp2p::multiaddr::Protocol; use libp2p::{Multiaddr, PeerId}; use lumina_node::network; @@ -18,8 +19,8 @@ use tracing_web::{performance_layer, MakeConsoleWriter}; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; use web_sys::{ - Crypto, DedicatedWorkerGlobalScope, Navigator, Request, RequestInit, RequestMode, Response, - SharedWorker, SharedWorkerGlobalScope, Worker, + DedicatedWorkerGlobalScope, Request, RequestInit, RequestMode, Response, SharedWorker, + SharedWorkerGlobalScope, Worker, }; use crate::error::{Context, Error, Result}; @@ -161,6 +162,16 @@ where /// have. This function doesn't `await` on JavaScript promise, as that would block until user /// either allows or blocks our request in a prompt (and we cannot do much with the result anyway). pub(crate) async fn request_storage_persistence() -> Result<(), Error> { + let storage_manager = if let Some(window) = web_sys::window() { + window.navigator().storage() + } else if Worker::is_worker_type() { + Worker::worker_self().navigator().storage() + } else if SharedWorker::is_worker_type() { + SharedWorker::worker_self().navigator().storage() + } else { + return Err(Error::new("`navigator.storage` not found in global scope")); + }; + let fullfiled = Closure::once(move |granted: JsValue| { if granted.is_truthy() { info!("Storage persistence acquired: {:?}", granted); @@ -173,10 +184,7 @@ pub(crate) async fn request_storage_persistence() -> Result<(), Error> { }); // don't drop the promise, we'll log the result and hope the user clicked the right button - let _promise = get_navigator()? - .storage() - .persist()? - .then2(&fullfiled, &rejected); + let _promise = storage_manager.persist()?.then2(&fullfiled, &rejected); // stop rust from dropping them fullfiled.forget(); @@ -186,29 +194,50 @@ pub(crate) async fn request_storage_persistence() -> Result<(), Error> { } const CHROME_USER_AGENT_DETECTION_STR: &str = "Chrome/"; +const FIREFOX_USER_AGENT_DETECTION_STR: &str = "Firefox/"; +const SAFARI_USER_AGENT_DETECTION_STR: &str = "Safari/"; -// Currently, there's an issue with SharedWorkers on Chrome where restarting Lumina's worker -// causes all network connections to fail. Until that's resolved detect chrome and apply -// a workaround. +pub(crate) fn get_user_agent() -> Result { + if let Some(window) = web_sys::window() { + Ok(window.navigator().user_agent()?) + } else if Worker::is_worker_type() { + Ok(Worker::worker_self().navigator().user_agent()?) + } else if SharedWorker::is_worker_type() { + Ok(SharedWorker::worker_self().navigator().user_agent()?) + } else { + Err(Error::new( + "`navigator.user_agent` not found in global scope", + )) + } +} + +#[allow(dead_code)] pub(crate) fn is_chrome() -> Result { - get_navigator()? - .user_agent() - .context("could not get UserAgent from Navigator") - .map(|user_agent| user_agent.contains(CHROME_USER_AGENT_DETECTION_STR)) + let user_agent = get_user_agent()?; + Ok(user_agent.contains(CHROME_USER_AGENT_DETECTION_STR)) +} + +pub(crate) fn is_firefox() -> Result { + let user_agent = get_user_agent()?; + Ok(user_agent.contains(FIREFOX_USER_AGENT_DETECTION_STR)) +} + +pub(crate) fn is_safari() -> Result { + let user_agent = get_user_agent()?; + // Chrome contains `Safari/`, so make sure user agent doesn't contain `Chrome/` + Ok(user_agent.contains(SAFARI_USER_AGENT_DETECTION_STR) + && !user_agent.contains(CHROME_USER_AGENT_DETECTION_STR)) } -pub(crate) fn get_navigator() -> Result { - js_sys::Reflect::get(&js_sys::global(), &JsValue::from_str("navigator")) - .context("failed to get `navigator` from global object")? - .dyn_into::() - .context("`navigator` is not instanceof `Navigator`") +pub(crate) fn shared_workers_supported() -> Result { + // For chrome we default to running in a dedicated Worker because: + // 1. Chrome Android does not support SharedWorkers at all + // 2. On desktop Chrome, restarting Lumina's worker causes all network connections to fail. + Ok(is_firefox()? || is_safari()?) } -pub(crate) fn get_crypto() -> Result { - js_sys::Reflect::get(&js_sys::global(), &JsValue::from_str("crypto")) - .context("failed to get `crypto` from global object")? - .dyn_into::() - .context("`crypto` is not `Crypto` type") +pub(crate) fn random_id() -> u32 { + (Math::random() * f64::from(u32::MAX)).floor() as u32 } async fn fetch(url: &str, opts: &RequestInit, headers: &[(&str, &str)]) -> Result { diff --git a/node-wasm/src/worker.rs b/node-wasm/src/worker.rs index b7a3bcae..c566c63e 100644 --- a/node-wasm/src/worker.rs +++ b/node-wasm/src/worker.rs @@ -17,7 +17,7 @@ use lumina_node::store::{IndexedDbStore, SamplingMetadata, Store}; use crate::error::{Context, Error, Result}; use crate::node::WasmNodeConfig; -use crate::utils::{get_crypto, WorkerSelf}; +use crate::utils::{random_id, WorkerSelf}; use crate::worker::channel::{ DedicatedWorkerMessageServer, MessageServer, SharedWorkerMessageServer, WorkerMessage, }; @@ -240,7 +240,7 @@ impl NodeWorker { pub async fn run_worker(queued_events: Vec) -> Result<()> { info!("Entered run_worker"); let (tx, mut rx) = mpsc::channel(WORKER_MESSAGE_SERVER_INCOMING_QUEUE_LENGTH); - let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid()); + let events_channel_name = format!("NodeEventChannel-{}", random_id()); let mut message_server: Box = if SharedWorker::is_worker_type() { Box::new(SharedWorkerMessageServer::new(tx.clone(), queued_events)) diff --git a/node/Cargo.toml b/node/Cargo.toml index dad83eb0..596876ea 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -42,7 +42,11 @@ instant = "0.1.13" prost = "0.12.6" rand = "0.8.5" serde = { version = "1.0.203", features = ["derive"] } -smallvec = { version = "1.13.2", features = ["union", "const_generics", "serde"] } +smallvec = { version = "1.13.2", features = [ + "union", + "const_generics", + "serde", +] } thiserror = "1.0.61" tokio = { version = "1.38.0", features = ["macros", "sync"] } tokio-util = "0.7.11" @@ -52,16 +56,19 @@ web-time = "1.1.0" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] backoff = { version = "0.4.0", features = ["tokio"] } blockstore = { workspace = true, features = ["redb"] } -tokio = { version = "1.38.0", features = ["rt-multi-thread", "time"] } +tokio = { version = "1.38.0", features = ["fs", "rt-multi-thread", "time"] } libp2p = { workspace = true, features = [ "noise", "dns", "tcp", "tokio", "yamux", + "websocket", "quic", ] } redb = "2.1.1" +rustls-pemfile = "2.1.2" +rustls-pki-types = "1.7.0" [target.'cfg(target_arch = "wasm32")'.dependencies] backoff = { version = "0.4.0", features = ["wasm-bindgen"] } @@ -71,8 +78,11 @@ celestia-types = { workspace = true, features = ["wasm-bindgen"] } getrandom = { version = "0.2.15", features = ["js"] } gloo-timers = { version = "0.3.0", features = ["futures"] } libp2p = { workspace = true, features = [ + "noise", "wasm-bindgen", "webtransport-websys", + "websocket-websys", + "yamux", ] } pin-project = "1.1.5" rexie = "0.5.0" diff --git a/node/src/node.rs b/node/src/node.rs index 16a26253..8c71f187 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -118,15 +118,18 @@ where let event_sub = event_channel.subscribe(); let store = Arc::new(config.store); - let p2p = Arc::new(P2p::start(P2pArgs { - network_id: config.network_id, - local_keypair: config.p2p_local_keypair, - bootnodes: config.p2p_bootnodes, - listen_on: config.p2p_listen_on, - blockstore: config.blockstore, - store: store.clone(), - event_pub: event_channel.publisher(), - })?); + let p2p = Arc::new( + P2p::start(P2pArgs { + network_id: config.network_id, + local_keypair: config.p2p_local_keypair, + bootnodes: config.p2p_bootnodes, + listen_on: config.p2p_listen_on, + blockstore: config.blockstore, + store: store.clone(), + event_pub: event_channel.publisher(), + }) + .await?, + ); let syncer = Arc::new(Syncer::start(SyncerArgs { store: store.clone(), diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 8e3b0227..0a11097f 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -53,6 +53,7 @@ use tracing::{debug, error, info, instrument, trace, warn}; mod header_ex; pub(crate) mod header_session; +mod kademlia; pub(crate) mod shwap; mod swarm; @@ -87,7 +88,7 @@ pub(crate) const GET_SAMPLE_TIMEOUT: Duration = Duration::from_secs(10); // will be ignored const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20; -type Result = std::result::Result; +pub(crate) type Result = std::result::Result; /// Representation of all the errors that can occur in `P2p` component. #[derive(Debug, thiserror::Error)] @@ -96,6 +97,10 @@ pub enum P2pError { #[error("Failed to initialize gossipsub behaviour: {0}")] GossipsubInit(String), + /// Failed to initialize TLS. + #[error("Failed to initialize TLS: {0}")] + TlsInit(String), + /// Failed to initialize noise protocol. #[error("Failed to initialize noise: {0}")] NoiseInit(String), @@ -145,6 +150,7 @@ impl P2pError { match self { P2pError::GossipsubInit(_) | P2pError::NoiseInit(_) + | P2pError::TlsInit(_) | P2pError::WorkerDied | P2pError::ChannelClosedUnexpectedly | P2pError::BootnodeAddrsWithoutPeerId(_) => true, @@ -232,7 +238,7 @@ pub(crate) enum P2pCmd { impl P2p { /// Creates and starts a new p2p handler. - pub fn start(args: P2pArgs) -> Result + pub async fn start(args: P2pArgs) -> Result where B: Blockstore + 'static, S: Store + 'static, @@ -245,7 +251,7 @@ impl P2p { let peer_tracker_info_watcher = peer_tracker.info_watcher(); let (cmd_tx, cmd_rx) = mpsc::channel(16); - let mut worker = Worker::new(args, cmd_rx, peer_tracker)?; + let mut worker = Worker::new(args, cmd_rx, peer_tracker).await?; spawn(async move { worker.run().await; @@ -566,7 +572,7 @@ where identify: identify::Behaviour, header_ex: HeaderExBehaviour, gossipsub: gossipsub::Behaviour, - kademlia: kad::Behaviour, + kademlia: kademlia::Behaviour, } struct Worker @@ -597,7 +603,7 @@ where B: Blockstore, S: Store, { - fn new( + async fn new( args: P2pArgs, cmd_rx: mpsc::Receiver, peer_tracker: Arc, @@ -636,7 +642,7 @@ where kademlia, }; - let mut swarm = new_swarm(args.local_keypair, behaviour)?; + let mut swarm = new_swarm(args.local_keypair, behaviour).await?; for addr in args.listen_on { if let Err(e) = swarm.listen_on(addr.clone()) { @@ -1167,7 +1173,7 @@ where Ok(gossipsub) } -fn init_kademlia(args: &P2pArgs) -> Result> +fn init_kademlia(args: &P2pArgs) -> Result where B: Blockstore, S: Store, @@ -1190,7 +1196,7 @@ where kademlia.set_mode(Some(kad::Mode::Server)); } - Ok(kademlia) + Ok(kademlia::Behaviour::new(kademlia)) } fn init_bitswap( diff --git a/node/src/p2p/kademlia.rs b/node/src/p2p/kademlia.rs new file mode 100644 index 00000000..83f002c6 --- /dev/null +++ b/node/src/p2p/kademlia.rs @@ -0,0 +1,189 @@ +use std::ops::{Deref, DerefMut}; +use std::task::{Context, Poll}; + +use libp2p::{ + core::Endpoint, + kad, + multiaddr::{Multiaddr, Protocol}, + swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, + }, + PeerId, +}; + +type Kad = kad::Behaviour; + +pub(crate) struct Behaviour(Kad); + +impl Behaviour { + pub(crate) fn new(inner: Kad) -> Behaviour { + Behaviour(inner) + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = THandler; + type ToSwarm = kad::Event; + + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[Multiaddr], + effective_role: Endpoint, + ) -> Result, ConnectionDenied> { + // `libp2p::kad::Behaviour` uses the following flow to dial a peer: + // + // 1. Kad's Behaviour discovers a new peer and its addresses. + // 2. Kad's Behaviour dials with `DialOpts::peer_id()` without setting the addresses. + // 3. `Swarm` calls `Behaviour::handle_pending_outbound_connection` to resolve the addresses. + // 4. Kad's Behaviour returns the vector of discovered addresses. + // 5. The addresses returned from the above step are NOT passed to any other Behaviour's. + // + // Here we intercept the return of Kad's Behaviour and cononicalize all `/tls/ws` + // addresses to `/wss` addresses. + let mut new_addrs = self.0.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + )?; + + for addr in new_addrs.iter_mut() { + if let Some(new_addr) = tls_ws_to_wss(addr) { + *addr = new_addr; + } + } + + for addr in addresses { + if let Some(new_addr) = tls_ws_to_wss(addr) { + new_addrs.push(new_addr); + } + } + + Ok(new_addrs) + } + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.0 + .handle_pending_inbound_connection(connection_id, local_addr, remote_addr) + } + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.0 + .handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: Endpoint, + ) -> Result, ConnectionDenied> { + self.0 + .handle_established_outbound_connection(connection_id, peer, addr, role_override) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + self.0 + .on_connection_handler_event(peer_id, connection_id, event) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + self.0.on_swarm_event(event) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.0.poll(cx) + } +} + +impl Deref for Behaviour { + type Target = Kad; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Behaviour { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +fn tls_ws_to_wss(addr: &Multiaddr) -> Option { + let tls_idx = addr.iter().position(|proto| proto == Protocol::Tls)?; + + let Some(Protocol::Ws(s)) = addr.iter().nth(tls_idx + 1) else { + return None; + }; + + let addr = addr + .iter() + .enumerate() + .fold(Multiaddr::empty(), |mut addr, (i, proto)| { + if i == tls_idx { + // Skip Protocol::Tls + } else if i == tls_idx + 1 { + // Replace Protocol::Ws with Protocol::Wss + addr.push(Protocol::Wss(s.clone())); + } else { + addr.push(proto); + } + + addr + }); + + Some(addr) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::async_test; + + #[async_test] + async fn tls_ws() { + let addr = "/dns4/dev.lumina.eiger.co/tcp/2121/tls/ws/p2p/12D3KooWJF4tkwBrycYhriE4nuYAo3Y8DESQzdN2tPWwndWe4KUd".parse().unwrap(); + let expected_addr = "/dns4/dev.lumina.eiger.co/tcp/2121/wss/p2p/12D3KooWJF4tkwBrycYhriE4nuYAo3Y8DESQzdN2tPWwndWe4KUd".parse().unwrap(); + + let new_addr = tls_ws_to_wss(&addr).unwrap(); + assert_eq!(new_addr, expected_addr); + } + + #[async_test] + async fn non_translatable() { + let addrs = [ + "/dns4/dev.lumina.eiger.co/tcp/2121/tls/p2p/12D3KooWJF4tkwBrycYhriE4nuYAo3Y8DESQzdN2tPWwndWe4KUd", + "/dns4/dev.lumina.eiger.co/tcp/2121/wss/p2p/12D3KooWJF4tkwBrycYhriE4nuYAo3Y8DESQzdN2tPWwndWe4KUd", + "/dns4/dev.lumina.eiger.co/tcp/2121/p2p/12D3KooWJF4tkwBrycYhriE4nuYAo3Y8DESQzdN2tPWwndWe4KUd", + ]; + + for addr in addrs { + let addr = addr.parse().unwrap(); + assert!(tls_ws_to_wss(&addr).is_none()); + } + } +} diff --git a/node/src/p2p/swarm.rs b/node/src/p2p/swarm.rs index ab800005..1283bd86 100644 --- a/node/src/p2p/swarm.rs +++ b/node/src/p2p/swarm.rs @@ -1,53 +1,117 @@ -use libp2p::{identity::Keypair, swarm::NetworkBehaviour, Swarm, SwarmBuilder}; +use libp2p::identity::Keypair; +use libp2p::swarm::{NetworkBehaviour, Swarm}; use web_time::Duration; -use crate::p2p::P2pError; +use crate::p2p::{P2pError, Result}; pub(crate) use self::imp::new_swarm; #[cfg(not(target_arch = "wasm32"))] mod imp { + use std::env; + use std::io::Cursor; + use std::path::Path; + + use futures::future::Either; + use libp2p::core::muxing::StreamMuxerBox; + use libp2p::core::upgrade::Version; + use libp2p::{dns, noise, quic, swarm, tcp, websocket, yamux, PeerId, Transport}; + use rustls_pki_types::{CertificateDer, PrivateKeyDer}; + use tokio::fs; + use super::*; - use libp2p::{dns, noise, tcp, yamux}; - pub(crate) fn new_swarm(keypair: Keypair, behaviour: B) -> Result, P2pError> + pub(crate) async fn new_swarm(keypair: Keypair, behaviour: B) -> Result> where B: NetworkBehaviour, { - Ok(SwarmBuilder::with_existing_identity(keypair) - .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - )? - .with_quic() - // We do not use system's DNS because libp2p loads DNS servers only when - // `Swarm` get constructed. This is not a problem for server machines, but - // it is for movable machines such as laptops and smart phones. Because of - // that, the following edge cases can happen: - // - // 1. Machine connects to WiFi A. Our node starts and uses DNS that - // WiFi A announced. Then machine moves to WiFi B, the old DNS servers - // become unreachable, but libp2p still uses the old ones. - // 2. Machine is not connected to the Internet. Our node starts and does not - // find any DNS servers defined. Then machine connects to the Internet, but - // libp2p still does not have any DNS nameservers defined. - // - // By having a pre-defined public servers, these edge cases solved. - .with_dns_config( - dns::ResolverConfig::cloudflare(), + let tls_key = match env::var("LUMINA_TLS_KEY_FILE") { + Ok(path) => Some(read_tls_key(path).await?), + Err(_) => None, + }; + + let tls_certs = match env::var("LUMINA_TLS_CERT_FILE") { + Ok(path) => Some(read_tls_certs(path).await?), + Err(_) => None, + }; + + // We do not use system's DNS because libp2p caches system DNS + // servers when `Swarm` get constructed, and doesn't update them + // later. This can be a problem, if device roams between networks + // (and old DNS addresses may not be reachable from the new network). + // + // Similarly, if node is started when there's no Internet connection, + // it won't use the DNS servers offered when Internet connectivity + // is restored. Instead we per-define globally-accessible public DNS servers. + let dns_config = dns::ResolverConfig::cloudflare(); + + let noise_config = + noise::Config::new(&keypair).map_err(|e| P2pError::NoiseInit(e.to_string()))?; + + let wss_transport = { + let config = if let (Some(key), Some(certs)) = (tls_key, tls_certs) { + let key = websocket::tls::PrivateKey::new(key.secret_der().to_vec()); + let certs = certs + .iter() + .map(|cert| websocket::tls::Certificate::new(cert.to_vec())); + + websocket::tls::Config::new(key, certs) + .map_err(|e| P2pError::TlsInit(format!("server config: {e}")))? + } else { + websocket::tls::Config::client() + }; + + let mut wss_transport = websocket::WsConfig::new(dns::tokio::Transport::custom( + tcp::tokio::Transport::new(tcp::Config::default()), + dns_config.clone(), dns::ResolverOpts::default(), - ) - .with_behaviour(|_| behaviour) - .expect("Moving behaviour doesn't fail") - .with_swarm_config(|config| { + )); + + wss_transport.set_tls_config(config); + + wss_transport + .upgrade(Version::V1Lazy) + .authenticate(noise_config.clone()) + .multiplex(yamux::Config::default()) + }; + + let tcp_transport = tcp::tokio::Transport::new(tcp::Config::default()) + .upgrade(Version::V1Lazy) + .authenticate(noise_config) + .multiplex(yamux::Config::default()); + + let quic_transport = quic::tokio::Transport::new(quic::Config::new(&keypair)); + + // WSS must be before TCP transport and must not be wrapped in DNS transport. + let transport = wss_transport + .or_transport(dns::tokio::Transport::custom( + tcp_transport + .or_transport(quic_transport) + .map(|either, _| match either { + Either::Left((peer_id, conn)) => (peer_id, StreamMuxerBox::new(conn)), + Either::Right((peer_id, conn)) => (peer_id, StreamMuxerBox::new(conn)), + }), + dns_config, + dns::ResolverOpts::default(), + )) + .map(|either, _| match either { + Either::Left((peer_id, conn)) => (peer_id, StreamMuxerBox::new(conn)), + Either::Right((peer_id, conn)) => (peer_id, StreamMuxerBox::new(conn)), + }) + .boxed(); + + let local_peer_id = PeerId::from_public_key(&keypair.public()); + + Ok(Swarm::new( + transport, + behaviour, + local_peer_id, + swarm::Config::with_tokio_executor() // TODO: Refactor code to avoid being idle. This can be done by preloading a // handler. This is how they fixed Kademlia: // https://github.com/libp2p/rust-libp2p/pull/4675/files - config.with_idle_connection_timeout(Duration::from_secs(15)) - }) - .build()) + .with_idle_connection_timeout(Duration::from_secs(15)), + )) } impl From for P2pError { @@ -55,19 +119,67 @@ mod imp { P2pError::NoiseInit(e.to_string()) } } + + async fn read_tls_key(path: impl AsRef) -> Result, P2pError> { + let path = path.as_ref(); + + // TODO: read key in a preallocated memory and zero it after use + let data = fs::read(&path) + .await + .map_err(|e| P2pError::TlsInit(format!("{}: {e}", path.display())))?; + + let mut data = Cursor::new(data); + + rustls_pemfile::private_key(&mut data) + .map_err(|e| P2pError::TlsInit(format!("{}: {e}", path.display())))? + .ok_or_else(|| P2pError::TlsInit(format!("{}: Key not found in file", path.display()))) + } + + async fn read_tls_certs( + path: impl AsRef, + ) -> Result>, P2pError> { + let path = path.as_ref(); + + let data = fs::read(path) + .await + .map_err(|e| P2pError::TlsInit(format!("{}: {e}", path.display())))?; + + let mut data = Cursor::new(data); + let certs = rustls_pemfile::certs(&mut data) + .collect::, std::io::Error>>() + .map_err(|e| P2pError::TlsInit(format!("{}: {e}", path.display())))?; + + if certs.is_empty() { + let e = format!("{}: Certificate not found in file", path.display()); + Err(P2pError::TlsInit(e)) + } else { + Ok(certs) + } + } } #[cfg(target_arch = "wasm32")] mod imp { use super::*; - use libp2p::webtransport_websys; + use libp2p::core::upgrade::Version; + use libp2p::{noise, websocket_websys, webtransport_websys, yamux, SwarmBuilder, Transport}; - pub(crate) fn new_swarm(keypair: Keypair, behaviour: B) -> Result, P2pError> + pub(crate) async fn new_swarm(keypair: Keypair, behaviour: B) -> Result> where B: NetworkBehaviour, { + let noise_config = + noise::Config::new(&keypair).map_err(|e| P2pError::NoiseInit(e.to_string()))?; + Ok(SwarmBuilder::with_existing_identity(keypair) .with_wasm_bindgen() + .with_other_transport(move |_| { + Ok(websocket_websys::Transport::default() + .upgrade(Version::V1Lazy) + .authenticate(noise_config) + .multiplex(yamux::Config::default())) + }) + .expect("websocket_websys::Transport is infallible") .with_other_transport(|local_keypair| { let config = webtransport_websys::Config::new(local_keypair); webtransport_websys::Transport::new(config)