From dae322c4c13e66c468a94d5aa208ab4fd4e30ae8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Florkiewicz?= Date: Thu, 19 Sep 2024 15:22:56 +0200 Subject: [PATCH] feat(node-wasm)!: Webpack compatibility (#377) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mikołaj Florkiewicz Co-authored-by: Maciej Zwoliński --- Cargo.lock | 1 + node-wasm/Cargo.toml | 4 + node-wasm/README.md | 34 ++- node-wasm/js/README.md | 1 + node-wasm/js/index.d.ts | 6 + node-wasm/js/index.js | 14 + node-wasm/js/package.json | 31 ++ node-wasm/js/worker.js | 10 + node-wasm/src/{node.rs => client.rs} | 139 +++------ node-wasm/src/{worker => }/commands.rs | 5 +- node-wasm/src/lib.rs | 7 +- node-wasm/src/ports.rs | 288 +++++++++++++++++++ node-wasm/src/utils.rs | 23 +- node-wasm/src/worker.rs | 151 +++++----- node-wasm/src/worker/channel.rs | 381 ------------------------- 15 files changed, 529 insertions(+), 566 deletions(-) create mode 120000 node-wasm/js/README.md create mode 100644 node-wasm/js/index.d.ts create mode 100644 node-wasm/js/index.js create mode 100644 node-wasm/js/package.json create mode 100644 node-wasm/js/worker.js rename node-wasm/src/{node.rs => client.rs} (71%) rename node-wasm/src/{worker => }/commands.rs (96%) create mode 100644 node-wasm/src/ports.rs delete mode 100644 node-wasm/src/worker/channel.rs diff --git a/Cargo.lock b/Cargo.lock index 9e352062..d9981590 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3312,6 +3312,7 @@ dependencies = [ "tracing-web", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-bindgen-test", "web-sys", ] diff --git a/node-wasm/Cargo.toml b/node-wasm/Cargo.toml index a50145d5..14b128c6 100644 --- a/node-wasm/Cargo.toml +++ b/node-wasm/Cargo.toml @@ -52,6 +52,7 @@ web-sys = { version = "0.3.70", features = [ "BroadcastChannel", "DedicatedWorkerGlobalScope", "Headers", + "MessageChannel", "MessageEvent", "MessagePort", "Navigator", @@ -70,6 +71,9 @@ web-sys = { version = "0.3.70", features = [ "WorkerType", ] } +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +wasm-bindgen-test = "0.3.42" + [package.metadata.docs.rs] targets = ["wasm32-unknown-unknown"] diff --git a/node-wasm/README.md b/node-wasm/README.md index 547f80ef..b34e6e4b 100644 --- a/node-wasm/README.md +++ b/node-wasm/README.md @@ -3,14 +3,40 @@ A compatibility layer for the [`Lumina`](https://github.com/eigerco/lumina) node to work within a browser environment and be operable with javascript. +# Example +Starting lumina inside a dedicated worker + ```javascript -import init, { Node, NodeConfig, Network } from "/wasm/lumina_node_wasm.js"; +import { spawnNode, NodeConfig, Network } from "lumina-node"; -await init(); +const node = await spawnNode(); +const mainnetConfig = NodeConfig.default(Network.Mainnet); -const config = NodeConfig.default(Network.Mainnet); -const node = await new Node(config); +await node.start(mainnetConfig); await node.wait_connected(); await node.request_head_header(); ``` + +## Manual setup + +Note that `spawnNode` implicitly calls wasm initialisation code. If you want to set things up manually, make sure to call the default export before using any of the wasm functionality. + +```javascript +import init, { NodeConfig, Network } from "lumina-node"; + +await init(); +const config = NodeConfig.default(Network.Mainnet); + +// client and worker accept any object with MessagePort like interface e.g. Worker +const channel = new MessageChannel(); +const worker = new NodeWorker(channel.port1); + +// note that this runs lumina in the current context (and doesn't create a new web-worker). Promise created with `.run()` never completes. +const worker_promise = worker.run(); + +// client port can be used locally or transferred like any plain MessagePort +const client = await new NodeClient(channel.port2); +await client.wait_connected(); +await client.request_head_header(); +``` diff --git a/node-wasm/js/README.md b/node-wasm/js/README.md new file mode 120000 index 00000000..32d46ee8 --- /dev/null +++ b/node-wasm/js/README.md @@ -0,0 +1 @@ +../README.md \ No newline at end of file diff --git a/node-wasm/js/index.d.ts b/node-wasm/js/index.d.ts new file mode 100644 index 00000000..41ad3977 --- /dev/null +++ b/node-wasm/js/index.d.ts @@ -0,0 +1,6 @@ +/** +* Spawn a worker running lumina node and get the `NodeClient` connected to it. +*/ +export function spawnNode(): Promise; +export * from "lumina-node-wasm"; +export default function init(): Promise; diff --git a/node-wasm/js/index.js b/node-wasm/js/index.js new file mode 100644 index 00000000..d02ec0ae --- /dev/null +++ b/node-wasm/js/index.js @@ -0,0 +1,14 @@ +import init, { NodeClient } from "lumina-node-wasm" + +/** +* Spawn a worker running lumina node and get the `NodeClient` connected to it. +*/ +export async function spawnNode() { + await init(); + let worker = new Worker(new URL("worker.js", import.meta.url)); + let client = await new NodeClient(worker); + return client; +} + +export * from "lumina-node-wasm"; +export default init; diff --git a/node-wasm/js/package.json b/node-wasm/js/package.json new file mode 100644 index 00000000..da70301b --- /dev/null +++ b/node-wasm/js/package.json @@ -0,0 +1,31 @@ +{ + "name": "lumina-node", + "type": "module", + "collaborators": [ + "Eiger " + ], + "description": "Lumina node for Celestia, running in browser", + "version": "0.2.0", + "license": "Apache-2.0", + "repository": { + "type": "git", + "url": "git+https://github.com/eigerco/lumina.git" + }, + "files": [ + "index.js", + "index.d.ts", + "worker.js" + ], + "main": "index.js", + "homepage": "https://www.eiger.co", + "dependencies": { + "lumina-node-wasm": "0.2.0" + }, + "keywords": [ + "blockchain", + "celestia", + "lumina", + "node", + "browser" + ] +} diff --git a/node-wasm/js/worker.js b/node-wasm/js/worker.js new file mode 100644 index 00000000..1a2a2d12 --- /dev/null +++ b/node-wasm/js/worker.js @@ -0,0 +1,10 @@ +import init, { NodeWorker, NodeClient } from "lumina-node-wasm" + +Error.stackTraceLimit = 99; + +init().then(async () => { + let worker = new NodeWorker(self); + console.log("starting worker: ", worker); + + await worker.run(); +}); diff --git a/node-wasm/src/node.rs b/node-wasm/src/client.rs similarity index 71% rename from node-wasm/src/node.rs rename to node-wasm/src/client.rs index 8f65a7a7..699de80c 100644 --- a/node-wasm/src/node.rs +++ b/node-wasm/src/client.rs @@ -13,19 +13,17 @@ use lumina_node::network::{canonical_network_bootnodes, network_id}; use lumina_node::node::NodeConfig; use lumina_node::store::IndexedDbStore; +use crate::commands::{CheckableResponseExt, NodeCommand, SingleHeaderQuery}; use crate::error::{Context, Result}; +use crate::ports::WorkerClient; use crate::utils::{ is_safari, js_value_from_display, request_storage_persistence, resolve_dnsaddr_multiaddress, - shared_workers_supported, Network, + Network, }; -use crate::worker::commands::{CheckableResponseExt, NodeCommand, SingleHeaderQuery}; -use crate::worker::{AnyWorker, WorkerClient}; use crate::wrapper::libp2p::NetworkInfoSnapshot; -const LUMINA_WORKER_NAME: &str = "lumina"; - /// Config for the lumina wasm node. -#[wasm_bindgen(js_name = NodeConfig)] +#[wasm_bindgen(inspectable, js_name = NodeConfig)] #[derive(Serialize, Deserialize, Debug)] pub struct WasmNodeConfig { /// A network to connect to. @@ -35,63 +33,21 @@ pub struct WasmNodeConfig { pub bootnodes: Vec, } -/// `NodeDriver` represents lumina node running in a dedicated Worker/SharedWorker. -/// It's responsible for sending commands and receiving responses from the node. -#[wasm_bindgen(js_name = NodeClient)] -struct NodeDriver { - client: WorkerClient, -} - -/// Type of worker to run lumina in. Allows overriding automatically detected worker kind -/// (which should usually be appropriate). +/// `NodeClient` is responsible for steering [`NodeWorker`] by sending it commands and receiving +/// responses over the provided port. +/// +/// [`NodeWorker`]: crate::worker::NodeWorker #[wasm_bindgen] -pub enum NodeWorkerKind { - /// Run in [`SharedWorker`] - /// - /// [`SharedWorker`]: https://developer.mozilla.org/en-US/docs/Web/API/SharedWorker - Shared, - /// Run in [`Worker`] - /// - /// [`Worker`]: https://developer.mozilla.org/en-US/docs/Web/API/Worker - Dedicated, +struct NodeClient { + worker: WorkerClient, } -#[wasm_bindgen(js_class = NodeClient)] -impl NodeDriver { - /// Create a new connection to a Lumina node running in a Shared Worker. - /// Note that single Shared Worker can be accessed from multiple tabs, so Lumina may - /// already have been started. Otherwise it needs to be started with [`NodeDriver::start`]. - /// - /// Requires serving a worker script and providing an url to it. The script should look like - /// so (the import statement may vary depending on your js-bundler): - /// ```js - /// import init, { run_worker } from 'lumina_node_wasm.js'; - /// - /// Error.stackTraceLimit = 99; - /// - /// // for SharedWorker we queue incoming connections - /// // for dedicated Worker we queue incoming messages (coming from the single client) - /// let queued = []; - /// if (typeof SharedWorkerGlobalScope !== 'undefined' && self instanceof SharedWorkerGlobalScope) { - /// onconnect = (event) => { - /// queued.push(event) - /// } - /// } else { - /// onmessage = (event) => { - /// queued.push(event); - /// } - /// } - /// - /// init().then(() => { - /// console.log("starting worker, queued messages: ", queued.length); - /// run_worker(queued); - /// }) - /// ``` +#[wasm_bindgen] +impl NodeClient { + /// Create a new connection to a Lumina node running in [`NodeWorker`]. Provided `port` is + /// expected to have `MessagePort`-like interface for sending and receiving messages. #[wasm_bindgen(constructor)] - pub async fn new( - worker_script_url: &str, - worker_type: Option, - ) -> Result { + pub async fn new(port: JsValue) -> Result { // Safari doesn't have the `navigator.storage()` api if !is_safari()? { if let Err(e) = request_storage_persistence().await { @@ -99,27 +55,20 @@ impl NodeDriver { } } - let default_worker_type = if shared_workers_supported().unwrap_or(false) { - NodeWorkerKind::Shared - } else { - NodeWorkerKind::Dedicated - }; - - let worker = AnyWorker::new( - worker_type.unwrap_or(default_worker_type), - worker_script_url, - LUMINA_WORKER_NAME, - )?; - Ok(Self { - client: WorkerClient::new(worker), + worker: WorkerClient::new(port)?, }) } + /// Establish a new connection to the existing worker over provided port + pub async fn add_connection_to_worker(&self, port: &JsValue) -> Result<()> { + self.worker.add_connection_to_worker(port).await + } + /// Check whether Lumina is currently running pub async fn is_running(&self) -> Result { let command = NodeCommand::IsRunning; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let running = response.into_is_running().check_variant()?; Ok(running) @@ -128,7 +77,7 @@ impl NodeDriver { /// Start a node with the provided config, if it's not running pub async fn start(&self, config: WasmNodeConfig) -> Result<()> { let command = NodeCommand::StartNode(config); - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; response.into_node_started().check_variant()??; Ok(()) @@ -137,7 +86,7 @@ impl NodeDriver { /// Get node's local peer ID. pub async fn local_peer_id(&self) -> Result { let command = NodeCommand::GetLocalPeerId; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let peer_id = response.into_local_peer_id().check_variant()?; Ok(peer_id) @@ -146,7 +95,7 @@ impl NodeDriver { /// Get current [`PeerTracker`] info. pub async fn peer_tracker_info(&self) -> Result { let command = NodeCommand::GetPeerTrackerInfo; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let peer_info = response.into_peer_tracker_info().check_variant()?; Ok(to_value(&peer_info)?) @@ -155,7 +104,7 @@ impl NodeDriver { /// Wait until the node is connected to at least 1 peer. pub async fn wait_connected(&self) -> Result<()> { let command = NodeCommand::WaitConnected { trusted: false }; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let _ = response.into_connected().check_variant()?; Ok(()) @@ -164,14 +113,14 @@ impl NodeDriver { /// Wait until the node is connected to at least 1 trusted peer. pub async fn wait_connected_trusted(&self) -> Result<()> { let command = NodeCommand::WaitConnected { trusted: true }; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; response.into_connected().check_variant()? } /// Get current network info. pub async fn network_info(&self) -> Result { let command = NodeCommand::GetNetworkInfo; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; response.into_network_info().check_variant()? } @@ -179,7 +128,7 @@ impl NodeDriver { /// Get all the multiaddresses on which the node listens. pub async fn listeners(&self) -> Result { let command = NodeCommand::GetListeners; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let listeners = response.into_listeners().check_variant()?; let result = listeners?.iter().map(js_value_from_display).collect(); @@ -189,7 +138,7 @@ impl NodeDriver { /// Get all the peers that node is connected to. pub async fn connected_peers(&self) -> Result { let command = NodeCommand::GetConnectedPeers; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let peers = response.into_connected_peers().check_variant()?; let result = peers?.iter().map(js_value_from_display).collect(); @@ -202,14 +151,14 @@ impl NodeDriver { peer_id: peer_id.parse()?, is_trusted, }; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; response.into_set_peer_trust().check_variant()? } /// Request the head header from the network. pub async fn request_head_header(&self) -> Result { let command = NodeCommand::RequestHeader(SingleHeaderQuery::Head); - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let header = response.into_header().check_variant()?; header.into() @@ -218,7 +167,7 @@ impl NodeDriver { /// Request a header for the block with a given hash from the network. pub async fn request_header_by_hash(&self, hash: &str) -> Result { let command = NodeCommand::RequestHeader(SingleHeaderQuery::ByHash(hash.parse()?)); - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let header = response.into_header().check_variant()?; header.into() @@ -227,7 +176,7 @@ impl NodeDriver { /// Request a header for the block with a given height from the network. pub async fn request_header_by_height(&self, height: u64) -> Result { let command = NodeCommand::RequestHeader(SingleHeaderQuery::ByHeight(height)); - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let header = response.into_header().check_variant()?; header.into() @@ -245,7 +194,7 @@ impl NodeDriver { from: from_header, amount, }; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let headers = response.into_headers().check_variant()?; headers.into() @@ -254,7 +203,7 @@ impl NodeDriver { /// Get current header syncing info. pub async fn syncer_info(&self) -> Result { let command = NodeCommand::GetSyncerInfo; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let syncer_info = response.into_syncer_info().check_variant()?; Ok(to_value(&syncer_info?)?) @@ -263,7 +212,7 @@ impl NodeDriver { /// Get the latest header announced in the network. pub async fn get_network_head_header(&self) -> Result { let command = NodeCommand::LastSeenNetworkHead; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let header = response.into_last_seen_network_head().check_variant()?; header.into() @@ -272,7 +221,7 @@ impl NodeDriver { /// Get the latest locally synced header. pub async fn get_local_head_header(&self) -> Result { let command = NodeCommand::GetHeader(SingleHeaderQuery::Head); - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let header = response.into_header().check_variant()?; header.into() @@ -281,7 +230,7 @@ impl NodeDriver { /// Get a synced header for the block with a given hash. pub async fn get_header_by_hash(&self, hash: &str) -> Result { let command = NodeCommand::GetHeader(SingleHeaderQuery::ByHash(hash.parse()?)); - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let header = response.into_header().check_variant()?; header.into() @@ -290,7 +239,7 @@ impl NodeDriver { /// Get a synced header for the block with a given height. pub async fn get_header_by_height(&self, height: u64) -> Result { let command = NodeCommand::GetHeader(SingleHeaderQuery::ByHeight(height)); - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let header = response.into_header().check_variant()?; header.into() @@ -314,7 +263,7 @@ impl NodeDriver { start_height, end_height, }; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let headers = response.into_headers().check_variant()?; headers.into() @@ -323,7 +272,7 @@ impl NodeDriver { /// Get data sampling metadata of an already sampled height. pub async fn get_sampling_metadata(&self, height: u64) -> Result { let command = NodeCommand::GetSamplingMetadata { height }; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let metadata = response.into_sampling_metadata().check_variant()?; Ok(to_value(&metadata?)?) @@ -333,7 +282,7 @@ impl NodeDriver { /// be processed and new NodeClient needs to be created to restart a node. pub async fn close(&self) -> Result<()> { let command = NodeCommand::CloseWorker; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; response.into_worker_closed().check_variant()?; Ok(()) @@ -342,7 +291,7 @@ impl NodeDriver { /// Returns a [`BroadcastChannel`] for events generated by [`Node`]. pub async fn events_channel(&self) -> Result { let command = NodeCommand::GetEventsChannelName; - let response = self.client.exec(command).await?; + let response = self.worker.exec(command).await?; let name = response.into_events_channel_name().check_variant()?; Ok(BroadcastChannel::new(&name).unwrap()) diff --git a/node-wasm/src/worker/commands.rs b/node-wasm/src/commands.rs similarity index 96% rename from node-wasm/src/worker/commands.rs rename to node-wasm/src/commands.rs index 5f822593..82ff5404 100644 --- a/node-wasm/src/worker/commands.rs +++ b/node-wasm/src/commands.rs @@ -12,14 +12,15 @@ use celestia_types::hash::Hash; use lumina_node::node::{PeerTrackerInfo, SyncingInfo}; use lumina_node::store::SamplingMetadata; +use crate::client::WasmNodeConfig; use crate::error::Error; use crate::error::Result; -use crate::node::WasmNodeConfig; use crate::utils::JsResult; use crate::wrapper::libp2p::NetworkInfoSnapshot; #[derive(Debug, Serialize, Deserialize)] pub(crate) enum NodeCommand { + InternalPing, IsRunning, StartNode(WasmNodeConfig), GetEventsChannelName, @@ -63,6 +64,8 @@ pub(crate) enum SingleHeaderQuery { #[derive(Serialize, Deserialize, Debug, EnumAsInner)] pub(crate) enum WorkerResponse { + InternalPong, + NodeNotRunning, IsRunning(bool), NodeStarted(Result<()>), EventsChannelName(String), diff --git a/node-wasm/src/lib.rs b/node-wasm/src/lib.rs index b7a131c9..f6bcdcab 100644 --- a/node-wasm/src/lib.rs +++ b/node-wasm/src/lib.rs @@ -1,8 +1,13 @@ #![doc = include_str!("../README.md")] #![cfg(target_arch = "wasm32")] +pub mod client; +mod commands; pub mod error; -pub mod node; +mod ports; pub mod utils; mod worker; mod wrapper; + +#[cfg(test)] +wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); diff --git a/node-wasm/src/ports.rs b/node-wasm/src/ports.rs new file mode 100644 index 00000000..6a8c1d71 --- /dev/null +++ b/node-wasm/src/ports.rs @@ -0,0 +1,288 @@ +use js_sys::{Array, Function, Reflect}; +use serde::Serialize; +use serde_wasm_bindgen::{from_value, to_value, Serializer}; +use tokio::sync::{mpsc, Mutex}; +use tracing::{error, info, trace}; +use wasm_bindgen::closure::Closure; +use wasm_bindgen::prelude::*; +use wasm_bindgen::{JsCast, JsValue}; +use web_sys::{MessageEvent, MessagePort}; + +use crate::commands::{NodeCommand, WorkerResponse}; +use crate::error::{Context, Error, Result}; +use crate::utils::MessageEventExt; + +// Instead of supporting communication with just `MessagePort`, allow using any object which +// provides compatible interface, eg. `Worker` +#[wasm_bindgen] +extern "C" { + pub type MessagePortLike; + + #[wasm_bindgen(catch, method, structural, js_name = postMessage)] + pub fn post_message(this: &MessagePortLike, message: &JsValue) -> Result<(), JsValue>; + + #[wasm_bindgen(catch, method, structural, js_name = postMessage)] + pub fn post_message_with_transferable( + this: &MessagePortLike, + message: &JsValue, + transferable: &JsValue, + ) -> Result<(), JsValue>; +} + +impl From for MessagePortLike { + fn from(port: MessagePort) -> Self { + JsValue::from(port).into() + } +} + +#[derive(Debug, Clone, Copy)] +pub struct ClientId(usize); + +pub(crate) enum ClientMessage { + Command { id: ClientId, command: NodeCommand }, + AddConnection(JsValue), +} + +struct ClientConnection { + port: MessagePortLike, + _onmessage: Closure, +} + +impl ClientConnection { + fn new( + id: ClientId, + port_like_object: JsValue, + server_tx: mpsc::UnboundedSender, + ) -> Result { + let onmessage = Closure::new(move |ev: MessageEvent| { + if let Some(port) = ev.get_port() { + if let Err(e) = server_tx.send(ClientMessage::AddConnection(port)) { + error!("port forwarding channel closed, shouldn't happen: {e}"); + } + } + + let command = match from_value(ev.data()) { + Ok(msg) => msg, + Err(e) => { + error!("could not deserialise message from {id:?}: {e}"); + return; + } + }; + + if let Err(e) = server_tx.send(ClientMessage::Command { id, command }) { + error!("message forwarding channel closed, shouldn't happen: {e}"); + } + }); + + let port = prepare_message_port(port_like_object, &onmessage) + .context("failed to setup port for ClientConnection")?; + + Ok(ClientConnection { + port, + _onmessage: onmessage, + }) + } + + fn send(&self, message: &WorkerResponse) -> Result<()> { + let serializer = Serializer::json_compatible(); + let message_value = message + .serialize(&serializer) + .context("could not serialise message")?; + self.port + .post_message(&message_value) + .context("could not send command to worker")?; + Ok(()) + } +} + +pub struct WorkerServer { + ports: Vec, + client_tx: mpsc::UnboundedSender, + client_rx: mpsc::UnboundedReceiver, +} + +impl WorkerServer { + pub fn new() -> WorkerServer { + let (client_tx, client_rx) = mpsc::unbounded_channel(); + + WorkerServer { + ports: vec![], + client_tx, + client_rx, + } + } + + pub async fn recv(&mut self) -> Result<(ClientId, NodeCommand)> { + loop { + match self + .client_rx + .recv() + .await + .expect("all of client connections should never close") + { + ClientMessage::Command { id, command } => { + return Ok((id, command)); + } + ClientMessage::AddConnection(port) => { + let client_id = ClientId(self.ports.len()); + info!("Connecting client {client_id:?}"); + + match ClientConnection::new(client_id, port, self.client_tx.clone()) { + Ok(port) => self.ports.push(port), + Err(e) => error!("Failed to setup ClientConnection: {e}"), + } + } + } + } + } + + pub fn get_control_channel(&self) -> mpsc::UnboundedSender { + self.client_tx.clone() + } + + pub fn respond_to(&self, client: ClientId, response: WorkerResponse) { + trace!("Responding to {client:?}"); + if let Err(e) = self.ports[client.0].send(&response) { + error!("Failed to send response to client: {e}"); + } + } +} + +pub struct WorkerClient { + port: MessagePortLike, + response_channel: + Mutex>>, + _onmessage: Closure, +} + +impl WorkerClient { + pub fn new(object: JsValue) -> Result { + let (response_tx, response_rx) = mpsc::unbounded_channel(); + + let onmessage = Closure::new(move |ev: MessageEvent| { + if let Err(e) = response_tx.send(from_value(ev.data())) { + error!("message forwarding channel closed, should not happen: {e}"); + } + }); + + let port = prepare_message_port(object, &onmessage) + .context("failed to setup port for WorkerClient")?; + + Ok(WorkerClient { + port, + response_channel: Mutex::new(response_rx), + _onmessage: onmessage, + }) + } + + pub(crate) async fn add_connection_to_worker(&self, port: &JsValue) -> Result<()> { + let mut response_channel = self.response_channel.lock().await; + + let command_value = + to_value(&NodeCommand::InternalPing).context("could not serialise message")?; + + self.port + .post_message_with_transferable(&command_value, &Array::of1(port)) + .context("could not transfer port")?; + + let worker_response = response_channel + .recv() + .await + .expect("response channel should never drop") + .context("error adding connection")?; + + if !worker_response.is_internal_pong() { + Err(Error::new(&format!( + "invalid response, expected InternalPing got {worker_response:?}" + ))) + } else { + Ok(()) + } + } + + pub(crate) async fn exec(&self, command: NodeCommand) -> Result { + let mut response_channel = self.response_channel.lock().await; + let command_value = to_value(&command).context("could not serialise message")?; + + self.port + .post_message(&command_value) + .context("could not post message")?; + + let worker_response = response_channel + .recv() + .await + .expect("response channel should never drop") + .context("error executing command")?; + + Ok(worker_response) + } +} + +// helper to hide slight differences in message passing between runtime.Port used by browser +// extensions and everything else +fn prepare_message_port( + object: JsValue, + callback: &Closure, +) -> Result { + // check whether provided object has `postMessage` method + let _post_message: Function = Reflect::get(&object, &"postMessage".into())? + .dyn_into() + .context("could not get object's postMessage")?; + + if Reflect::has(&object, &JsValue::from("onMessage")) + .context("failed to reflect onMessage property")? + { + // Browser extension runtime.Port has `onMessage` property, on which we should call + // `addListener` on. + let listeners = Reflect::get(&object, &"onMessage".into()) + .context("could not get `onMessage` property")?; + + let add_listener: Function = Reflect::get(&listeners, &"addListener".into()) + .context("could not get `onMessage.addListener` property")? + .dyn_into() + .context("expected `onMessage.addListener` to be a function")?; + Reflect::apply(&add_listener, &listeners, &Array::of1(callback.as_ref())) + .context("error calling `onMessage.addListener`")?; + } else if Reflect::has(&object, &JsValue::from("onmessage")) + .context("failed to reflect onmessage property")? + { + // MessagePort, as well as message passing via Worker instance, requires setting + // `onmessage` property to callback + Reflect::set(&object, &"onmessage".into(), callback.as_ref()) + .context("could not set onmessage callback")?; + } else { + return Err(Error::new("Don't know how to register onmessage callback")); + } + + Ok(MessagePortLike::from(object)) +} + +#[cfg(test)] +mod tests { + use super::*; + use wasm_bindgen_futures::spawn_local; + use wasm_bindgen_test::wasm_bindgen_test; + use web_sys::MessageChannel; + + #[wasm_bindgen_test] + async fn client_server() { + let mut server = WorkerServer::new(); + let tx = server.get_control_channel(); + + // pre-load response + spawn_local(async move { + let channel = MessageChannel::new().unwrap(); + + tx.send(ClientMessage::AddConnection(channel.port2().into())) + .unwrap(); + + let client0 = WorkerClient::new(channel.port1().into()).unwrap(); + let response = client0.exec(NodeCommand::IsRunning).await.unwrap(); + assert!(matches!(response, WorkerResponse::IsRunning(true))); + }); + + let (client, command) = server.recv().await.unwrap(); + assert!(matches!(command, NodeCommand::IsRunning)); + server.respond_to(client, WorkerResponse::IsRunning(true)); + } +} diff --git a/node-wasm/src/utils.rs b/node-wasm/src/utils.rs index e3bc04f2..9f08a6e3 100644 --- a/node-wasm/src/utils.rs +++ b/node-wasm/src/utils.rs @@ -18,8 +18,8 @@ use tracing_web::MakeConsoleWriter; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::JsFuture; use web_sys::{ - DedicatedWorkerGlobalScope, Request, RequestInit, RequestMode, Response, SharedWorker, - SharedWorkerGlobalScope, Worker, + DedicatedWorkerGlobalScope, MessageEvent, Request, RequestInit, RequestMode, Response, + SharedWorker, SharedWorkerGlobalScope, Worker, }; use crate::error::{Context, Error, Result}; @@ -153,6 +153,23 @@ where } } +pub(crate) trait MessageEventExt { + fn get_port(&self) -> Option; +} + +impl MessageEventExt for MessageEvent { + fn get_port(&self) -> Option { + let ports = self.ports(); + if ports.is_array() { + let port = ports.get(0); + if !port.is_undefined() { + return Some(port); + } + } + None + } +} + /// Request persistent storage from user for us, which has side effect of increasing the quota we /// have. This function doesn't `await` on JavaScript promise, as that would block until user /// either allows or blocks our request in a prompt (and we cannot do much with the result anyway). @@ -212,6 +229,7 @@ pub(crate) fn is_chrome() -> Result { Ok(user_agent.contains(CHROME_USER_AGENT_DETECTION_STR)) } +#[allow(dead_code)] pub(crate) fn is_firefox() -> Result { let user_agent = get_user_agent()?; Ok(user_agent.contains(FIREFOX_USER_AGENT_DETECTION_STR)) @@ -224,6 +242,7 @@ pub(crate) fn is_safari() -> Result { && !user_agent.contains(CHROME_USER_AGENT_DETECTION_STR)) } +#[allow(dead_code)] pub(crate) fn shared_workers_supported() -> Result { // For chrome we default to running in a dedicated Worker because: // 1. Chrome Android does not support SharedWorkers at all diff --git a/node-wasm/src/worker.rs b/node-wasm/src/worker.rs index 083e4fbe..c0bce2e3 100644 --- a/node-wasm/src/worker.rs +++ b/node-wasm/src/worker.rs @@ -2,36 +2,27 @@ use std::fmt::Debug; use js_sys::Array; use libp2p::{Multiaddr, PeerId}; -use lumina_node::events::{EventSubscriber, NodeEventInfo}; use serde::{Deserialize, Serialize}; use serde_wasm_bindgen::{from_value, to_value}; use thiserror::Error; use tokio::sync::mpsc; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; use wasm_bindgen::prelude::*; use wasm_bindgen_futures::spawn_local; -use web_sys::{BroadcastChannel, MessageEvent, SharedWorker}; +use web_sys::{BroadcastChannel, SharedWorker}; use lumina_node::blockstore::IndexedDbBlockstore; +use lumina_node::events::{EventSubscriber, NodeEventInfo}; use lumina_node::node::{Node, SyncingInfo}; use lumina_node::store::{IndexedDbStore, SamplingMetadata, Store}; +use crate::client::WasmNodeConfig; +use crate::commands::{NodeCommand, SingleHeaderQuery, WorkerResponse}; use crate::error::{Context, Error, Result}; -use crate::node::WasmNodeConfig; +use crate::ports::{ClientMessage, WorkerServer}; use crate::utils::{random_id, WorkerSelf}; -use crate::worker::channel::{ - DedicatedWorkerMessageServer, MessageServer, SharedWorkerMessageServer, WorkerMessage, -}; -use crate::worker::commands::{NodeCommand, SingleHeaderQuery, WorkerResponse}; use crate::wrapper::libp2p::NetworkInfoSnapshot; -mod channel; -pub(crate) mod commands; - -pub(crate) use channel::{AnyWorker, WorkerClient}; - -const WORKER_MESSAGE_SERVER_INCOMING_QUEUE_LENGTH: usize = 64; - #[derive(Debug, Serialize, Deserialize, Error)] pub enum WorkerError { /// Worker is initialised, but the node has not been started yet. Use [`NodeDriver::start`]. @@ -49,12 +40,74 @@ pub enum WorkerError { NodeError(Error), } +#[wasm_bindgen] struct NodeWorker { + event_channel_name: String, + node: Option, + request_server: WorkerServer, + _control_channel: mpsc::UnboundedSender, +} + +struct NodeWorkerInstance { node: Node, events_channel_name: String, } +#[wasm_bindgen] impl NodeWorker { + #[wasm_bindgen(constructor)] + pub fn new(port_like_object: JsValue) -> Self { + info!("Created lumina worker"); + + let request_server = WorkerServer::new(); + let control_channel = request_server.get_control_channel(); + + control_channel + .send(ClientMessage::AddConnection(port_like_object)) + .expect("control channel should be ready to receive now"); + + Self { + event_channel_name: format!("NodeEventChannel-{}", random_id()), + node: None, + request_server, + _control_channel: control_channel, + } + } + + pub async fn run(&mut self) -> Result<(), Error> { + loop { + let (client_id, command) = self.request_server.recv().await?; + + let response = match &mut self.node { + Some(node) => node.process_command(command).await, + node @ None => match command { + NodeCommand::InternalPing => WorkerResponse::InternalPong, + NodeCommand::IsRunning => WorkerResponse::IsRunning(false), + NodeCommand::GetEventsChannelName => { + WorkerResponse::EventsChannelName(self.event_channel_name.clone()) + } + NodeCommand::StartNode(config) => { + match NodeWorkerInstance::new(&self.event_channel_name, config).await { + Ok(instance) => { + let _ = node.insert(instance); + WorkerResponse::NodeStarted(Ok(())) + } + Err(e) => WorkerResponse::NodeStarted(Err(e)), + } + } + _ => { + warn!("Worker not running"); + WorkerResponse::NodeNotRunning + } + }, + }; + + self.request_server.respond_to(client_id, response); + } + } +} + +impl NodeWorkerInstance { async fn new(events_channel_name: &str, config: WasmNodeConfig) -> Result { let config = config.into_node_config().await?; @@ -233,77 +286,11 @@ impl NodeWorker { SharedWorker::worker_self().close(); WorkerResponse::WorkerClosed(()) } + NodeCommand::InternalPing => WorkerResponse::InternalPong, } } } -#[wasm_bindgen] -pub async fn run_worker(queued_events: Vec) -> Result<()> { - info!("Entered run_worker"); - let (tx, mut rx) = mpsc::channel(WORKER_MESSAGE_SERVER_INCOMING_QUEUE_LENGTH); - let events_channel_name = format!("NodeEventChannel-{}", random_id()); - - let mut message_server: Box = if SharedWorker::is_worker_type() { - Box::new(SharedWorkerMessageServer::new(tx.clone(), queued_events)) - } else { - Box::new(DedicatedWorkerMessageServer::new(tx.clone(), queued_events).await) - }; - - info!("Entering worker message loop"); - let mut worker = None; - while let Some(message) = rx.recv().await { - match message { - WorkerMessage::NewConnection(connection) => { - message_server.add(connection); - } - WorkerMessage::InvalidCommandReceived(client_id) => { - message_server.respond_err_to(client_id, WorkerError::InvalidCommandReceived); - } - WorkerMessage::Command((command, client_id)) => { - debug!("received from {client_id:?}: {command:?}"); - let Some(worker) = &mut worker else { - match command { - NodeCommand::IsRunning => { - message_server.respond_to(client_id, WorkerResponse::IsRunning(false)); - } - NodeCommand::GetEventsChannelName => { - message_server.respond_to( - client_id, - WorkerResponse::EventsChannelName(events_channel_name.clone()), - ); - } - NodeCommand::StartNode(config) => { - match NodeWorker::new(&events_channel_name, config).await { - Ok(node) => { - worker = Some(node); - message_server - .respond_to(client_id, WorkerResponse::NodeStarted(Ok(()))); - } - Err(e) => { - message_server - .respond_to(client_id, WorkerResponse::NodeStarted(Err(e))); - } - }; - } - _ => { - warn!("Worker not running"); - message_server.respond_err_to(client_id, WorkerError::NodeNotRunning); - } - } - continue; - }; - - let response = worker.process_command(command).await; - message_server.respond_to(client_id, response); - } - } - } - - info!("Channel to WorkerMessageServer closed, exiting the SharedWorker"); - - Ok(()) -} - async fn event_forwarder_task(mut events_sub: EventSubscriber, events_channel: BroadcastChannel) { #[derive(Serialize)] struct Event { diff --git a/node-wasm/src/worker/channel.rs b/node-wasm/src/worker/channel.rs deleted file mode 100644 index 4ae694f1..00000000 --- a/node-wasm/src/worker/channel.rs +++ /dev/null @@ -1,381 +0,0 @@ -use std::fmt::{self, Debug}; - -use serde_wasm_bindgen::{from_value, to_value}; -use tokio::sync::{mpsc, Mutex}; -use tracing::{debug, error, info, warn}; -use wasm_bindgen::prelude::*; -use wasm_bindgen_futures::spawn_local; -use web_sys::{ - DedicatedWorkerGlobalScope, MessageEvent, MessagePort, SharedWorker, Worker, WorkerOptions, - WorkerType, -}; - -use crate::error::{Context, Error, Result}; -use crate::node::NodeWorkerKind; -use crate::utils::WorkerSelf; -use crate::worker::commands::{NodeCommand, WorkerResponse}; -use crate::worker::WorkerError; - -type WireMessage = Result; -type WorkerClientConnection = (MessagePort, Closure); - -/// Access to sending channel is protected by mutex to make sure we only can hold a single -/// writable instance from JS. Thus we expect to have at most 1 message in-flight. -const WORKER_CHANNEL_SIZE: usize = 1; - -/// `WorkerClient` is responsible for sending messages to and receiving responses from [`WorkerMessageServer`]. -/// It covers JS details like callbacks, having to synchronise requests, responses, and exposes -/// simple RPC-like function call interface. -pub(crate) struct WorkerClient { - worker: AnyWorker, - response_channel: Mutex>, - _onmessage: Closure, - _onerror: Closure, -} - -impl WorkerClient { - /// Create a new WorkerClient to control newly created Shared or Dedicated Worker running - /// MessageServer - pub(crate) fn new(worker: AnyWorker) -> Self { - let (response_tx, response_rx) = mpsc::channel(WORKER_CHANNEL_SIZE); - - let onmessage = worker.setup_on_message_callback(response_tx); - let onerror = worker.setup_on_error_callback(); - - Self { - worker, - response_channel: Mutex::new(response_rx), - _onmessage: onmessage, - _onerror: onerror, - } - } - - /// Send command to lumina and wait for a response. - /// - /// Response enum variant can be converted into appropriate type at runtime with a provided - /// [`CheckableResponseExt`] helper. - /// - /// [`CheckableResponseExt`]: crate::utils::CheckableResponseExt - pub(crate) async fn exec(&self, command: NodeCommand) -> Result { - let mut response_channel = self.response_channel.lock().await; - self.send(command) - .map_err(WorkerError::WorkerCommunicationError)?; - - let message: WireMessage = response_channel - .recv() - .await - .expect("response channel should never be dropped"); - - message - } - - fn send(&self, command: NodeCommand) -> Result<()> { - let command_value = - to_value(&command).context("could not serialise worker command to be sent")?; - match &self.worker { - AnyWorker::DedicatedWorker(worker) => worker - .post_message(&command_value) - .context("could not send command to worker"), - AnyWorker::SharedWorker(worker) => worker - .port() - .post_message(&command_value) - .context("could not send command to worker"), - } - } -} - -pub(crate) enum AnyWorker { - DedicatedWorker(Worker), - SharedWorker(SharedWorker), -} - -impl From for AnyWorker { - fn from(worker: SharedWorker) -> Self { - AnyWorker::SharedWorker(worker) - } -} - -impl From for AnyWorker { - fn from(worker: Worker) -> Self { - AnyWorker::DedicatedWorker(worker) - } -} - -impl AnyWorker { - pub(crate) fn new(kind: NodeWorkerKind, url: &str, name: &str) -> Result { - let opts = WorkerOptions::new(); - opts.set_type(WorkerType::Module); - opts.set_name(name); - - Ok(match kind { - NodeWorkerKind::Shared => { - info!("Starting SharedWorker"); - AnyWorker::SharedWorker( - SharedWorker::new_with_worker_options(url, &opts) - .context("could not create SharedWorker")?, - ) - } - NodeWorkerKind::Dedicated => { - info!("Starting Worker"); - AnyWorker::DedicatedWorker( - Worker::new_with_options(url, &opts).context("could not create Worker")?, - ) - } - }) - } - - fn setup_on_message_callback( - &self, - response_tx: mpsc::Sender, - ) -> Closure { - let onmessage_callback = move |ev: MessageEvent| { - let response_tx = response_tx.clone(); - spawn_local(async move { - let data: WireMessage = match from_value(ev.data()) { - Ok(jsvalue) => jsvalue, - Err(e) => { - error!("WorkerClient could not convert from JsValue: {e}"); - let error = Error::from(e).context("could not deserialise worker response"); - Err(WorkerError::WorkerCommunicationError(error)) - } - }; - - if let Err(e) = response_tx.send(data).await { - error!("message forwarding channel closed, should not happen: {e}"); - } - }) - }; - - let onmessage = Closure::new(onmessage_callback); - match self { - AnyWorker::SharedWorker(worker) => { - let message_port = worker.port(); - message_port.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); - } - AnyWorker::DedicatedWorker(worker) => { - worker.set_onmessage(Some(onmessage.as_ref().unchecked_ref())) - } - } - onmessage - } - - fn setup_on_error_callback(&self) -> Closure { - let onerror = Closure::new(|ev: MessageEvent| { - error!("received error from Worker: {:?}", ev); - }); - match self { - AnyWorker::SharedWorker(worker) => { - worker.set_onerror(Some(onerror.as_ref().unchecked_ref())) - } - AnyWorker::DedicatedWorker(worker) => { - worker.set_onerror(Some(onerror.as_ref().unchecked_ref())) - } - } - - onerror - } -} - -#[derive(Debug, Clone, Copy)] -pub(super) struct ClientId(usize); - -impl fmt::Display for ClientId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Client({})", self.0) - } -} - -pub(super) enum WorkerMessage { - NewConnection(MessagePort), - InvalidCommandReceived(ClientId), - Command((NodeCommand, ClientId)), -} - -impl From<(MessageEvent, ClientId)> for WorkerMessage { - fn from(value: (MessageEvent, ClientId)) -> Self { - let (event, client) = value; - match from_value(event.data()) { - Ok(command) => { - debug!("received command from client {client}: {command:#?}"); - WorkerMessage::Command((command, client)) - } - Err(e) => { - warn!("could not deserialize message from client {client}: {e}"); - WorkerMessage::InvalidCommandReceived(client) - } - } - } -} - -pub(super) trait MessageServer { - fn send_response(&self, client: ClientId, message: WireMessage); - fn add(&mut self, port: MessagePort); - - fn respond_to(&self, client: ClientId, msg: WorkerResponse) { - self.send_response(client, Ok(msg)) - } - - fn respond_err_to(&self, client: ClientId, error: WorkerError) { - self.send_response(client, Err(error)) - } -} - -pub(super) struct SharedWorkerMessageServer { - // same onconnect callback is used throughtout entire Worker lifetime. - // Keep a reference to make sure it doesn't get dropped. - _onconnect: Closure, - - // keep a MessagePort for each client to send messages over, as well as callback responsible - // for forwarding messages back - clients: Vec, - - // sends events back to the main loop for processing - command_channel: mpsc::Sender, -} - -impl SharedWorkerMessageServer { - pub fn new(command_channel: mpsc::Sender, queued: Vec) -> Self { - let worker_scope = SharedWorker::worker_self(); - let onconnect = get_client_connect_callback(command_channel.clone()); - worker_scope.set_onconnect(Some(onconnect.as_ref().unchecked_ref())); - - let mut server = Self { - _onconnect: onconnect, - clients: Vec::with_capacity(usize::max(queued.len(), 1)), - command_channel, - }; - - for event in queued { - if let Ok(port) = event.ports().at(0).dyn_into() { - server.add(port); - } else { - error!("received onconnect event without MessagePort, should not happen"); - } - } - - server - } -} - -impl MessageServer for SharedWorkerMessageServer { - fn add(&mut self, port: MessagePort) { - let client_id = ClientId(self.clients.len()); - - let client_message_callback = - get_client_message_callback(self.command_channel.clone(), client_id); - - self.clients.push((port, client_message_callback)); - - let (port, callback) = self.clients.last().unwrap(); - port.set_onmessage(Some(callback.as_ref().unchecked_ref())); - - info!("SharedWorker ready to receive commands from client {client_id}"); - } - - fn send_response(&self, client: ClientId, message: WireMessage) { - let Some((client_port, _)) = self.clients.get(client.0) else { - error!("client {client} not found on the client list, should not happen"); - return; - }; - - if let Err(e) = client_port.post_message(&serialize_response_message(&message)) { - error!("could not post response message to client {client}: {e:?}"); - } - } -} - -pub(super) struct DedicatedWorkerMessageServer { - // same onmessage callback is used throughtout entire Worker lifetime. - // Keep a reference to make sure it doesn't get dropped. - _onmessage: Closure, - // global scope we use to send messages - worker: DedicatedWorkerGlobalScope, -} - -impl DedicatedWorkerMessageServer { - pub async fn new( - command_channel: mpsc::Sender, - queued: Vec, - ) -> Self { - for event in queued { - let message = WorkerMessage::from((event, ClientId(0))); - - if let Err(e) = command_channel.send(message).await { - error!("command channel inside worker closed, should not happen: {e}"); - } - } - - let worker = Worker::worker_self(); - let onmessage = get_client_message_callback(command_channel, ClientId(0)); - worker.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); - - Self { - _onmessage: onmessage, - worker, - } - } -} - -impl MessageServer for DedicatedWorkerMessageServer { - fn add(&mut self, _port: MessagePort) { - error!("DedicatedWorkerMessageServer::add called, should not happen"); - } - - fn send_response(&self, client: ClientId, message: WireMessage) { - if let Err(e) = self - .worker - .post_message(&serialize_response_message(&message)) - { - error!("could not post response message to client {client}: {e:?}"); - } - } -} - -fn get_client_connect_callback( - command_channel: mpsc::Sender, -) -> Closure { - Closure::new(move |ev: MessageEvent| { - let command_channel = command_channel.clone(); - spawn_local(async move { - let Ok(port) = ev.ports().at(0).dyn_into() else { - error!("received onconnect event without MessagePort, should not happen"); - return; - }; - - if let Err(e) = command_channel - .send(WorkerMessage::NewConnection(port)) - .await - { - error!("command channel inside worker closed, should not happen: {e}"); - } - }) - }) -} - -fn get_client_message_callback( - command_channel: mpsc::Sender, - client: ClientId, -) -> Closure { - Closure::new(move |ev: MessageEvent| { - let command_channel = command_channel.clone(); - spawn_local(async move { - let message = WorkerMessage::from((ev, client)); - - if let Err(e) = command_channel.send(message).await { - error!("command channel inside worker closed, should not happen: {e}"); - } - }) - }) -} - -fn serialize_response_message(message: &WireMessage) -> JsValue { - match to_value(message) { - Ok(jsvalue) => jsvalue, - Err(e) => { - warn!("provided response could not be coverted to JsValue: {e}"); - let error = Error::from(e).context("couldn't serialise worker response"); - to_value(&WorkerError::WorkerCommunicationError(error)) - .expect("something's very wrong, couldn't serialise serialisation error") - } - } -}