Skip to content

Commit

Permalink
feat(node-wasm)!: Add websocket support (#341)
Browse files Browse the repository at this point in the history
Co-authored-by: Mikołaj Florkiewicz <[email protected]>
  • Loading branch information
zvolin and fl0rek authored Aug 1, 2024
1 parent 3393380 commit df62e20
Show file tree
Hide file tree
Showing 10 changed files with 487 additions and 92 deletions.
48 changes: 47 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion node-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ wasm-bindgen = "0.2.92"
wasm-bindgen-futures = "0.4.42"
web-sys = { version = "0.3.69", features = [
"BroadcastChannel",
"Crypto",
"DedicatedWorkerGlobalScope",
"Headers",
"MessageEvent",
Expand All @@ -66,6 +65,7 @@ web-sys = { version = "0.3.69", features = [
"Window",
"Worker",
"WorkerGlobalScope",
"WorkerNavigator",
"WorkerOptions",
"WorkerType",
] }
Expand Down
20 changes: 10 additions & 10 deletions node-wasm/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use lumina_node::store::IndexedDbStore;

use crate::error::{Context, Result};
use crate::utils::{
is_chrome, js_value_from_display, request_storage_persistence, resolve_dnsaddr_multiaddress,
Network,
is_safari, js_value_from_display, request_storage_persistence, resolve_dnsaddr_multiaddress,
shared_workers_supported, Network,
};
use crate::worker::commands::{CheckableResponseExt, NodeCommand, SingleHeaderQuery};
use crate::worker::{AnyWorker, WorkerClient};
Expand Down Expand Up @@ -92,17 +92,17 @@ impl NodeDriver {
worker_script_url: &str,
worker_type: Option<NodeWorkerKind>,
) -> Result<NodeDriver> {
if let Err(e) = request_storage_persistence().await {
error!("Error requesting storage persistence: {e}");
// Safari doesn't have the `navigator.storage()` api
if !is_safari()? {
if let Err(e) = request_storage_persistence().await {
error!("Error requesting storage persistence: {e}");
}
}

// For chrome we default to running in a dedicated Worker because:
// 1. Chrome Android does not support SharedWorkers at all
// 2. On desktop Chrome, restarting Lumina's worker causes all network connections to fail.
let default_worker_type = if is_chrome().unwrap_or(false) {
NodeWorkerKind::Dedicated
} else {
let default_worker_type = if shared_workers_supported().unwrap_or(false) {
NodeWorkerKind::Shared
} else {
NodeWorkerKind::Dedicated
};

let worker = AnyWorker::new(
Expand Down
75 changes: 52 additions & 23 deletions node-wasm/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::borrow::Cow;
use std::fmt::{self, Debug};
use std::net::{IpAddr, Ipv4Addr};

use js_sys::Math;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use lumina_node::network;
Expand All @@ -18,8 +19,8 @@ use tracing_web::{performance_layer, MakeConsoleWriter};
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use web_sys::{
Crypto, DedicatedWorkerGlobalScope, Navigator, Request, RequestInit, RequestMode, Response,
SharedWorker, SharedWorkerGlobalScope, Worker,
DedicatedWorkerGlobalScope, Request, RequestInit, RequestMode, Response, SharedWorker,
SharedWorkerGlobalScope, Worker,
};

use crate::error::{Context, Error, Result};
Expand Down Expand Up @@ -161,6 +162,16 @@ where
/// have. This function doesn't `await` on JavaScript promise, as that would block until user
/// either allows or blocks our request in a prompt (and we cannot do much with the result anyway).
pub(crate) async fn request_storage_persistence() -> Result<(), Error> {
let storage_manager = if let Some(window) = web_sys::window() {
window.navigator().storage()
} else if Worker::is_worker_type() {
Worker::worker_self().navigator().storage()
} else if SharedWorker::is_worker_type() {
SharedWorker::worker_self().navigator().storage()
} else {
return Err(Error::new("`navigator.storage` not found in global scope"));
};

let fullfiled = Closure::once(move |granted: JsValue| {
if granted.is_truthy() {
info!("Storage persistence acquired: {:?}", granted);
Expand All @@ -173,10 +184,7 @@ pub(crate) async fn request_storage_persistence() -> Result<(), Error> {
});

// don't drop the promise, we'll log the result and hope the user clicked the right button
let _promise = get_navigator()?
.storage()
.persist()?
.then2(&fullfiled, &rejected);
let _promise = storage_manager.persist()?.then2(&fullfiled, &rejected);

// stop rust from dropping them
fullfiled.forget();
Expand All @@ -186,29 +194,50 @@ pub(crate) async fn request_storage_persistence() -> Result<(), Error> {
}

const CHROME_USER_AGENT_DETECTION_STR: &str = "Chrome/";
const FIREFOX_USER_AGENT_DETECTION_STR: &str = "Firefox/";
const SAFARI_USER_AGENT_DETECTION_STR: &str = "Safari/";

// Currently, there's an issue with SharedWorkers on Chrome where restarting Lumina's worker
// causes all network connections to fail. Until that's resolved detect chrome and apply
// a workaround.
pub(crate) fn get_user_agent() -> Result<String, Error> {
if let Some(window) = web_sys::window() {
Ok(window.navigator().user_agent()?)
} else if Worker::is_worker_type() {
Ok(Worker::worker_self().navigator().user_agent()?)
} else if SharedWorker::is_worker_type() {
Ok(SharedWorker::worker_self().navigator().user_agent()?)
} else {
Err(Error::new(
"`navigator.user_agent` not found in global scope",
))
}
}

#[allow(dead_code)]
pub(crate) fn is_chrome() -> Result<bool, Error> {
get_navigator()?
.user_agent()
.context("could not get UserAgent from Navigator")
.map(|user_agent| user_agent.contains(CHROME_USER_AGENT_DETECTION_STR))
let user_agent = get_user_agent()?;
Ok(user_agent.contains(CHROME_USER_AGENT_DETECTION_STR))
}

pub(crate) fn is_firefox() -> Result<bool, Error> {
let user_agent = get_user_agent()?;
Ok(user_agent.contains(FIREFOX_USER_AGENT_DETECTION_STR))
}

pub(crate) fn is_safari() -> Result<bool, Error> {
let user_agent = get_user_agent()?;
// Chrome contains `Safari/`, so make sure user agent doesn't contain `Chrome/`
Ok(user_agent.contains(SAFARI_USER_AGENT_DETECTION_STR)
&& !user_agent.contains(CHROME_USER_AGENT_DETECTION_STR))
}

pub(crate) fn get_navigator() -> Result<Navigator, Error> {
js_sys::Reflect::get(&js_sys::global(), &JsValue::from_str("navigator"))
.context("failed to get `navigator` from global object")?
.dyn_into::<Navigator>()
.context("`navigator` is not instanceof `Navigator`")
pub(crate) fn shared_workers_supported() -> Result<bool, Error> {
// For chrome we default to running in a dedicated Worker because:
// 1. Chrome Android does not support SharedWorkers at all
// 2. On desktop Chrome, restarting Lumina's worker causes all network connections to fail.
Ok(is_firefox()? || is_safari()?)
}

pub(crate) fn get_crypto() -> Result<Crypto, Error> {
js_sys::Reflect::get(&js_sys::global(), &JsValue::from_str("crypto"))
.context("failed to get `crypto` from global object")?
.dyn_into::<web_sys::Crypto>()
.context("`crypto` is not `Crypto` type")
pub(crate) fn random_id() -> u32 {
(Math::random() * f64::from(u32::MAX)).floor() as u32
}

async fn fetch(url: &str, opts: &RequestInit, headers: &[(&str, &str)]) -> Result<Response, Error> {
Expand Down
4 changes: 2 additions & 2 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use lumina_node::store::{IndexedDbStore, SamplingMetadata, Store};

use crate::error::{Context, Error, Result};
use crate::node::WasmNodeConfig;
use crate::utils::{get_crypto, WorkerSelf};
use crate::utils::{random_id, WorkerSelf};
use crate::worker::channel::{
DedicatedWorkerMessageServer, MessageServer, SharedWorkerMessageServer, WorkerMessage,
};
Expand Down Expand Up @@ -240,7 +240,7 @@ impl NodeWorker {
pub async fn run_worker(queued_events: Vec<MessageEvent>) -> Result<()> {
info!("Entered run_worker");
let (tx, mut rx) = mpsc::channel(WORKER_MESSAGE_SERVER_INCOMING_QUEUE_LENGTH);
let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());
let events_channel_name = format!("NodeEventChannel-{}", random_id());

let mut message_server: Box<dyn MessageServer> = if SharedWorker::is_worker_type() {
Box::new(SharedWorkerMessageServer::new(tx.clone(), queued_events))
Expand Down
14 changes: 12 additions & 2 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ instant = "0.1.13"
prost = "0.12.6"
rand = "0.8.5"
serde = { version = "1.0.203", features = ["derive"] }
smallvec = { version = "1.13.2", features = ["union", "const_generics", "serde"] }
smallvec = { version = "1.13.2", features = [
"union",
"const_generics",
"serde",
] }
thiserror = "1.0.61"
tokio = { version = "1.38.0", features = ["macros", "sync"] }
tokio-util = "0.7.11"
Expand All @@ -52,16 +56,19 @@ web-time = "1.1.0"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
backoff = { version = "0.4.0", features = ["tokio"] }
blockstore = { workspace = true, features = ["redb"] }
tokio = { version = "1.38.0", features = ["rt-multi-thread", "time"] }
tokio = { version = "1.38.0", features = ["fs", "rt-multi-thread", "time"] }
libp2p = { workspace = true, features = [
"noise",
"dns",
"tcp",
"tokio",
"yamux",
"websocket",
"quic",
] }
redb = "2.1.1"
rustls-pemfile = "2.1.2"
rustls-pki-types = "1.7.0"

[target.'cfg(target_arch = "wasm32")'.dependencies]
backoff = { version = "0.4.0", features = ["wasm-bindgen"] }
Expand All @@ -71,8 +78,11 @@ celestia-types = { workspace = true, features = ["wasm-bindgen"] }
getrandom = { version = "0.2.15", features = ["js"] }
gloo-timers = { version = "0.3.0", features = ["futures"] }
libp2p = { workspace = true, features = [
"noise",
"wasm-bindgen",
"webtransport-websys",
"websocket-websys",
"yamux",
] }
pin-project = "1.1.5"
rexie = "0.5.0"
Expand Down
21 changes: 12 additions & 9 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,18 @@ where
let event_sub = event_channel.subscribe();
let store = Arc::new(config.store);

let p2p = Arc::new(P2p::start(P2pArgs {
network_id: config.network_id,
local_keypair: config.p2p_local_keypair,
bootnodes: config.p2p_bootnodes,
listen_on: config.p2p_listen_on,
blockstore: config.blockstore,
store: store.clone(),
event_pub: event_channel.publisher(),
})?);
let p2p = Arc::new(
P2p::start(P2pArgs {
network_id: config.network_id,
local_keypair: config.p2p_local_keypair,
bootnodes: config.p2p_bootnodes,
listen_on: config.p2p_listen_on,
blockstore: config.blockstore,
store: store.clone(),
event_pub: event_channel.publisher(),
})
.await?,
);

let syncer = Arc::new(Syncer::start(SyncerArgs {
store: store.clone(),
Expand Down
Loading

0 comments on commit df62e20

Please sign in to comment.