From 0896de47c80c25fcf6e3f9e339e3aa29b84ddcd4 Mon Sep 17 00:00:00 2001 From: Josh Wilson Date: Thu, 11 Jan 2024 13:55:59 +0100 Subject: [PATCH] feat: client webtransport-websys feat --- .github/workflows/merge.yml | 37 ++++ Cargo.lock | 209 +++++++++++++++++++---- README.md | 21 +++ sn_client/Cargo.toml | 24 ++- sn_client/src/api.rs | 41 ++++- sn_client/src/audit/mod.rs | 8 +- sn_client/src/error.rs | 3 +- sn_client/src/event.rs | 2 +- sn_client/src/files/download.rs | 6 +- sn_client/src/files/upload.rs | 3 +- sn_client/src/lib.rs | 65 +++++++ sn_client/src/wallet.rs | 6 +- sn_faucet/Cargo.toml | 4 +- sn_networking/Cargo.toml | 18 +- sn_networking/src/bootstrap.rs | 15 +- sn_networking/src/cmd.rs | 8 +- sn_networking/src/driver.rs | 99 ++++++++--- sn_networking/src/event.rs | 11 +- sn_networking/src/lib.rs | 46 ++++- sn_networking/src/metrics.rs | 8 +- sn_networking/src/network_discovery.rs | 9 +- sn_networking/src/record_store.rs | 26 ++- sn_networking/src/replication_fetcher.rs | 16 +- sn_node/Cargo.toml | 3 +- sn_node/tests/common/client.rs | 2 +- sn_node_rpc_client/Cargo.toml | 2 +- sn_node_rpc_client/src/lib.rs | 2 +- sn_peers_acquisition/Cargo.toml | 6 +- sn_peers_acquisition/src/lib.rs | 23 ++- sn_protocol/Cargo.toml | 13 +- sn_protocol/src/node_registry.rs | 19 ++- sn_testnet/Cargo.toml | 6 +- sn_testnet/src/check_testnet.rs | 5 +- sn_testnet/src/lib.rs | 26 ++- sn_testnet/src/main.rs | 4 +- sn_transfers/Cargo.toml | 5 +- 36 files changed, 614 insertions(+), 187 deletions(-) diff --git a/.github/workflows/merge.yml b/.github/workflows/merge.yml index e42c00e61a..4b55758825 100644 --- a/.github/workflows/merge.yml +++ b/.github/workflows/merge.yml @@ -121,6 +121,43 @@ jobs: # we do many more runs on the nightly run PROPTEST_CASES: 50 + wasm: + if: "!startsWith(github.event.head_commit.message, 'chore(release):')" + name: Wasm builds + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + + - name: Install wasm-pack + run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh + + - name: Build client for wasm + # wasm pack doesnt support workspaces + # --dev to avoid a loong optimisation step + run: cd sn_client && wasm-pack build --dev + timeout-minutes: 30 + + websocket: + if: "!startsWith(github.event.head_commit.message, 'chore(release):')" + name: Standard Websocket builds + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + + - name: Build all for `websockets` + run: cargo build --features="websockets" + timeout-minutes: 30 + e2e: if: "!startsWith(github.event.head_commit.message, 'chore(release):')" name: E2E tests diff --git a/Cargo.lock b/Cargo.lock index bcb66b476d..c8b5c81789 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,7 +511,16 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" dependencies = [ - "digest", + "digest 0.10.7", +] + +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", ] [[package]] @@ -719,16 +728,16 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" dependencies = [ "android-tzdata", "iana-time-zone", "js-sys", "num-traits", "wasm-bindgen", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -739,9 +748,9 @@ checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" [[package]] name = "ciborium" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "effd91f6c78e5a4ace8a5d3c0b6bfaec9e2baaef55f3efc00e45fb2e477ee926" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" dependencies = [ "ciborium-io", "ciborium-ll", @@ -750,15 +759,15 @@ dependencies = [ [[package]] name = "ciborium-io" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdf919175532b369853f5d5e20b26b43112613fd6fe7aee757e35f7a44642656" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" [[package]] name = "ciborium-ll" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defaa24ecc093c77630e6c15e17c51f5e187bf35ee514f4e2d67baaa96dae22b" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" dependencies = [ "ciborium-io", "half", @@ -909,6 +918,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "console_error_panic_hook" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06aeb73f470f66dcdbf7223caeebb85984942f22f1adb2a088cf9668146bbbc" +dependencies = [ + "cfg-if", + "wasm-bindgen", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1108,7 +1127,7 @@ dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", - "digest", + "digest 0.10.7", "fiat-crypto", "platforms", "rustc_version", @@ -1234,13 +1253,22 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + [[package]] name = "digest" version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer", + "block-buffer 0.10.4", "crypto-common", "subtle", ] @@ -1649,6 +1677,10 @@ name = "futures-timer" version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" +dependencies = [ + "gloo-timers", + "send_wrapper 0.4.0", +] [[package]] name = "futures-util" @@ -1737,6 +1769,18 @@ dependencies = [ "walkdir", ] +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.12.1" @@ -1771,9 +1815,13 @@ dependencies = [ [[package]] name = "half" -version = "1.8.2" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" +checksum = "bc52e53916c08643f1b56ec082790d1e86a32e58dc5268f897f313fbae7b4872" +dependencies = [ + "cfg-if", + "crunchy", +] [[package]] name = "hashbrown" @@ -1918,7 +1966,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest", + "digest 0.10.7", ] [[package]] @@ -2207,6 +2255,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" dependencies = [ "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", ] [[package]] @@ -2343,7 +2394,8 @@ dependencies = [ "libp2p-swarm 0.44.1", "libp2p-tcp", "libp2p-upnp", - "libp2p-webtransport-websys", + "libp2p-websocket", + "libp2p-websocket-websys", "libp2p-yamux", "multiaddr", "pin-project", @@ -2779,6 +2831,7 @@ dependencies = [ "fnv", "futures", "futures-timer", + "getrandom", "instant", "libp2p-core 0.41.2", "libp2p-identity", @@ -2790,6 +2843,7 @@ dependencies = [ "tokio", "tracing", "void", + "wasm-bindgen-futures", ] [[package]] @@ -2842,9 +2896,9 @@ dependencies = [ [[package]] name = "libp2p-upnp" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "963eb8a174f828f6a51927999a9ab5e45dfa9aa2aa5fed99aa65f79de6229464" +checksum = "b49cc89949bf0e06869297cd4fe2c132358c23fe93e76ad43950453df4da3d35" dependencies = [ "futures", "futures-timer", @@ -2857,23 +2911,40 @@ dependencies = [ ] [[package]] -name = "libp2p-webtransport-websys" -version = "0.2.0" +name = "libp2p-websocket" +version = "0.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "840b63681e3bedbdb3df3e4f2dd48e9a20d2c6714264829ab90c6fce8549d627" +checksum = "f4846d51afd08180e164291c3754ba30dd4fbac6fac65571be56403c16431a5e" dependencies = [ + "either", "futures", - "js-sys", + "futures-rustls", "libp2p-core 0.41.2", "libp2p-identity", - "libp2p-noise", - "multiaddr", - "multihash", - "send_wrapper", + "parking_lot", + "pin-project-lite", + "rw-stream-sink", + "soketto", + "tracing", + "url", + "webpki-roots", +] + +[[package]] +name = "libp2p-websocket-websys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "550e578dcc9cd572be9dd564831d1f5efe8e6661953768b1d56c1d462855bf6f" +dependencies = [ + "bytes", + "futures", + "js-sys", + "libp2p-core 0.41.2", + "parking_lot", + "send_wrapper 0.6.0", "thiserror", "tracing", "wasm-bindgen", - "wasm-bindgen-futures", "web-sys", ] @@ -4484,9 +4555,9 @@ dependencies = [ [[package]] name = "self_encryption" -version = "0.29.0" +version = "0.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9ca43227f2459c3bd7f827d672e9b21db86284f8f36014221a1827d0243b17" +checksum = "1ab2cd87e583738aba86278972e9116e2aabdb7fceda2be1fb3abe543be2336e" dependencies = [ "aes", "bincode", @@ -4513,14 +4584,17 @@ version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" +[[package]] +name = "send_wrapper" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" + [[package]] name = "send_wrapper" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" -dependencies = [ - "futures-core", -] [[package]] name = "serde" @@ -4587,6 +4661,19 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha-1" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", +] + [[package]] name = "sha1" version = "0.10.6" @@ -4595,7 +4682,7 @@ checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.7", ] [[package]] @@ -4606,7 +4693,7 @@ checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.7", ] [[package]] @@ -4711,6 +4798,7 @@ dependencies = [ "backoff", "blsttc", "bytes", + "console_error_panic_hook", "custom_debug", "eyre", "futures", @@ -4727,6 +4815,7 @@ dependencies = [ "self_encryption", "serde", "sn_networking", + "sn_peers_acquisition", "sn_protocol", "sn_registers", "sn_transfers", @@ -4735,6 +4824,11 @@ dependencies = [ "tiny-keccak", "tokio", "tracing", + "tracing-wasm", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasmtimer", + "web-sys", "xor_name", ] @@ -4812,6 +4906,8 @@ dependencies = [ "tiny-keccak", "tokio", "tracing", + "wasm-bindgen-futures", + "wasmtimer", "xor_name", ] @@ -4917,7 +5013,6 @@ dependencies = [ "crdts", "custom_debug", "dirs-next", - "getrandom", "hex", "libp2p 0.53.2", "prost 0.9.0", @@ -5004,9 +5099,9 @@ dependencies = [ [[package]] name = "snow" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58021967fd0a5eeeb23b08df6cc244a4d4a5b4aec1d27c9e02fad1a58b4cd74e" +checksum = "2e87c18a6608909007e75a60e04d03eda77b601c94de1c74d9a9dc2c04ab789a" dependencies = [ "aes-gcm", "blake2", @@ -5029,6 +5124,21 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "soketto" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d1c5305e39e09653383c2c7244f2f78b3bcae37cf50c64cb4789c9f5096ec2" +dependencies = [ + "base64 0.13.1", + "bytes", + "futures", + "httparse", + "log", + "rand", + "sha-1", +] + [[package]] name = "spin" version = "0.5.2" @@ -5718,6 +5828,17 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-wasm" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4575c663a174420fa2d78f4108ff68f65bf2fbb7dd89f33749b6e826b3626e07" +dependencies = [ + "tracing", + "tracing-subscriber", + "wasm-bindgen", +] + [[package]] name = "try-lock" version = "0.2.5" @@ -6055,6 +6176,20 @@ version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +[[package]] +name = "wasmtimer" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f656cd8858a5164932d8a90f936700860976ec21eb00e0fe2aa8cab13f6b4cf" +dependencies = [ + "futures", + "js-sys", + "parking_lot", + "pin-utils", + "slab", + "wasm-bindgen", +] + [[package]] name = "web-sys" version = "0.3.67" diff --git a/README.md b/README.md index fb203278f7..5a97e8d96f 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,27 @@ The Data on the Safe Network is Decentralised, Autonomous, and built atop of Ka - [Faucet](https://github.com/maidsafe/sn_faucet/blob/master/README.md) The local faucet server, used to claim genesis and request tokens from the network. - [Node RPC](https://github.com/maidsafe/sn_node_rpc_client/blob/master/README.md) The RPC server used by the nodes to expose API calls to the outside world. +#### Transport Protocols and Architectures + +The Safe Network uses `quic` as the default transport protocol. + +The `websockets` feature is available for the `sn_networking` crate, and above, and will allow for tcp over websockets. + +If building for `wasm32` then `websockets` are enabled by default as this is the only method avilable to communicate with a network as things stand. (And that network must have `websockets` enabled.) + +##### Building for wasm32 + +- Install [wasm-pack](https://rustwasm.github.io/wasm-pack/installer/) +- `cd sn_client && wasm-pack build` + +You can then pull this package into a web app eg, to use it. + +eg `await safe.get_data("/ip4/127.0.0.1/tcp/59324/ws/p2p/12D3KooWG6kyBwLVHj5hYK2SqGkP4GqrCz5gfwsvPBYic4c4TeUz","9d7e115061066126482a229822e6d68737bd67d826c269762c0f64ce87af6b4c")` + +#### Browser usage + +Browser usage is highly experimental, but the wasm32 target for `sn_client` _should_ work here. YMMV until stabilised. + ### For the Technical - [Logging](https://github.com/maidsafe/sn_logging/blob/master/README.md) The generalised logging crate used by the safe network (backed by the tracing crate). diff --git a/sn_client/Cargo.toml b/sn_client/Cargo.toml index 6a709b3602..c7808ae609 100644 --- a/sn_client/Cargo.toml +++ b/sn_client/Cargo.toml @@ -10,16 +10,12 @@ readme = "README.md" repository = "https://github.com/maidsafe/safe_network" version = "0.102.9" -[lib] -crate-type = ["cdylib", "rlib"] - - [features] -default=["quic"] +default=[] local-discovery=["sn_networking/local-discovery"] open-metrics = ["sn_networking/open-metrics", "prometheus-client"] # required to pass on flag to node builds -quic = ["sn_networking/quic"] +websockets = ["sn_networking/websockets"] [dependencies] async-trait = "0.1" @@ -45,7 +41,7 @@ sn_transfers = { path = "../sn_transfers", version = "0.14.41" } tempfile = "3.6.0" thiserror = "1.0.23" tiny-keccak = "~2.0.2" -tokio = { version = "1.32.0", features = ["io-util", "macros", "parking_lot", "rt", "sync", "time", "fs"] } +tokio = { version = "1.35.0", features = ["io-util", "macros", "rt", "sync", "time"] } tracing = { version = "~0.1.26" } xor_name = "5.0.0" @@ -58,6 +54,18 @@ libp2p-identity = { version="0.2.7", features = ["rand"] } workspace = true +# to allow wasm compilation +[lib] +crate-type = ["cdylib", "rlib"] + + [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2.12", features = ["js"] } -sn_networking = { path = "../sn_networking", version = "0.12.37" } +wasm-bindgen = "0.2.90" +wasm-bindgen-futures = "0.4.40" +sn_peers_acquisition= { path="../sn_peers_acquisition", version = "0.2.0" } +console_error_panic_hook = "0.1.6" +tracing-wasm = "0.2.1" +wasmtimer = "0.2.0" +web-sys = { version= "0.3.22", features=["console"] } + diff --git a/sn_client/src/api.rs b/sn_client/src/api.rs index 42801ca8bc..8751436706 100644 --- a/sn_client/src/api.rs +++ b/sn_client/src/api.rs @@ -38,23 +38,38 @@ use sn_protocol::{ }; use sn_registers::{Permissions, SignedRegister}; use sn_transfers::{CashNote, CashNoteRedemption, MainPubkey, NanoTokens, Payment, SignedSpend}; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; use std::{ collections::{HashMap, HashSet}, num::NonZeroUsize, path::PathBuf, - time::Duration, }; +#[cfg(not(target_arch = "wasm32"))] use tokio::task::spawn; +use tokio::time::Duration; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::{interval, timeout}; use tracing::trace; +#[cfg(target_arch = "wasm32")] +use wasm_bindgen_futures::spawn_local as spawn; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; +#[cfg(target_arch = "wasm32")] +use wasmtimer::tokio::{interval, timeout}; use xor_name::XorName; /// The maximum duration the client will wait for a connection to the network before timing out. -const CONNECTION_TIMEOUT: Duration = Duration::from_secs(180); +const CONNECTION_TIMEOUT: Duration = Duration::from_secs(30); /// The timeout duration for the client to receive any response from the network. const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(30); impl Client { + /// A quick client that only takes some peers to connect to + pub async fn quick_start(peers: Option>) -> Result { + Self::new(SecretKey::random(), peers, false, None, None).await + } /// Instantiate a new client. /// /// Optionally specify the maximum time the client will wait for a connection to the network before timing out. @@ -74,9 +89,16 @@ impl Client { info!("Startup a client with peers {peers:?} and local {local:?} flag"); info!("Starting Kad swarm in client mode..."); + debug!("Starting Kad swarm in client mode.1.."); + trace!("Starting Kad swarm in client mode..2."); + + #[cfg(target_arch = "wasm32")] + let root_dir = PathBuf::from("dumb"); + #[cfg(not(target_arch = "wasm32"))] + let root_dir = std::env::temp_dir(); + trace!("Starting Kad swarm in client mode..{root_dir:?}."); - let mut network_builder = - NetworkBuilder::new(Keypair::generate_ed25519(), local, std::env::temp_dir()); + let mut network_builder = NetworkBuilder::new(Keypair::generate_ed25519(), local, root_dir); if enable_gossip { network_builder.enable_gossip(); @@ -127,8 +149,7 @@ impl Client { let _event_handler = spawn(async move { let mut peers_added: usize = 0; loop { - match tokio::time::timeout(INACTIVITY_TIMEOUT, network_event_receiver.recv()).await - { + match timeout(INACTIVITY_TIMEOUT, network_event_receiver.recv()).await { Ok(event) => { let the_event = match event { Some(the_event) => the_event, @@ -138,7 +159,7 @@ impl Client { } }; - let start = std::time::Instant::now(); + let start = Instant::now(); let event_string = format!("{the_event:?}"); if let Err(err) = client_clone.handle_network_event(the_event, &mut peers_added) @@ -164,7 +185,8 @@ impl Client { // loop to connect to the network let mut is_connected = false; let connection_timeout = connection_timeout.unwrap_or(CONNECTION_TIMEOUT); - let mut connection_timeout_interval = tokio::time::interval(connection_timeout); + + let mut connection_timeout_interval = interval(connection_timeout); // first tick completes immediately connection_timeout_interval.tick().await; @@ -411,7 +433,6 @@ impl Client { Ok(self.network.put_record(record, &put_cfg).await?) } - /// Retrieve a `Chunk` from the kad network. pub async fn get_chunk(&self, address: ChunkAddress, show_holders: bool) -> Result { info!("Getting chunk: {address:?}"); let key = NetworkAddress::from_chunk_address(address).to_record_key(); @@ -667,9 +688,11 @@ impl Client { // now we try and get batched chunks, keep track of any that fail // Iterate over each uploaded chunk let mut verify_handles = Vec::new(); + for (name, chunk_path) in chunks_batch.iter().cloned() { let client = self.clone(); // Spawn a new task to fetch each chunk concurrently + // this is specifically tokio here, as wasm-bindgen-futures breaks this let handle = tokio::spawn(async move { // make sure the chunk is stored; let chunk = Chunk::new(Bytes::from(std::fs::read(&chunk_path)?)); diff --git a/sn_client/src/audit/mod.rs b/sn_client/src/audit/mod.rs index ca9eb7413d..8012490cc2 100644 --- a/sn_client/src/audit/mod.rs +++ b/sn_client/src/audit/mod.rs @@ -21,7 +21,11 @@ use sn_transfers::{ CashNoteRedemption, SignedSpend, SpendAddress, Transfer, WalletError, WalletResult, NETWORK_ROYALTIES_PK, }; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; use std::{collections::BTreeSet, iter::Iterator, path::Path}; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; impl Client { /// Verify that a spend is valid on the network. @@ -64,7 +68,7 @@ impl Client { let mut txs_to_verify = BTreeSet::from_iter([first_spend.spend.parent_tx]); let mut depth = 0; let mut verified_tx = BTreeSet::new(); - let start = std::time::Instant::now(); + let start = Instant::now(); while !txs_to_verify.is_empty() { let mut next_gen_tx = BTreeSet::new(); @@ -172,7 +176,7 @@ impl Client { let mut all_utxos = BTreeSet::new(); let mut verified_tx = BTreeSet::new(); let mut gen = 0; - let start = std::time::Instant::now(); + let start = Instant::now(); while !txs_to_follow.is_empty() { let mut next_gen_tx = BTreeSet::new(); diff --git a/sn_client/src/error.rs b/sn_client/src/error.rs index 2609fe4048..7dd1b7c3af 100644 --- a/sn_client/src/error.rs +++ b/sn_client/src/error.rs @@ -11,8 +11,9 @@ pub(crate) type Result = std::result::Result; use super::ClientEvent; use sn_registers::{Entry, EntryHash}; use sn_transfers::SpendAddress; -use std::{collections::BTreeSet, time::Duration}; +use std::collections::BTreeSet; use thiserror::Error; +use tokio::time::Duration; /// Internal error. #[derive(Debug, Error)] diff --git a/sn_client/src/event.rs b/sn_client/src/event.rs index d1ff6d7780..3762b39b1e 100644 --- a/sn_client/src/event.rs +++ b/sn_client/src/event.rs @@ -47,7 +47,7 @@ pub enum ClientEvent { ConnectedToNetwork, /// No network activity has been received for a given duration /// we should error out - InactiveClient(std::time::Duration), + InactiveClient(tokio::time::Duration), /// Gossipsub message received on a topic the client has subscribed to GossipsubMsg { /// Topic the message was published on diff --git a/sn_client/src/files/download.rs b/sn_client/src/files/download.rs index a98a95fe68..4310d7c85b 100644 --- a/sn_client/src/files/download.rs +++ b/sn_client/src/files/download.rs @@ -16,8 +16,12 @@ use futures::StreamExt; use itertools::Itertools; use self_encryption::{decrypt_full_set, DataMap, EncryptedChunk, StreamSelfDecryptor}; use sn_protocol::storage::{Chunk, ChunkAddress}; -use std::{collections::HashMap, fs, path::PathBuf, time::Instant}; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; +use std::{collections::HashMap, fs, path::PathBuf}; use tokio::sync::mpsc::{self}; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; use xor_name::XorName; /// The events emitted from the download process. diff --git a/sn_client/src/files/upload.rs b/sn_client/src/files/upload.rs index 71164fc9e5..cf3e084931 100644 --- a/sn_client/src/files/upload.rs +++ b/sn_client/src/files/upload.rs @@ -169,6 +169,7 @@ impl FilesUpload { /// If you want to track the upload progress, use the `get_upload_events` method. pub async fn upload_chunks(&mut self, mut chunks: Vec<(XorName, PathBuf)>) -> Result<()> { trace!("Uploading chunks {:?}", chunks.len()); + // make sure we log that the event sender is absent atleast once self.logged_event_sender_absence = false; @@ -406,7 +407,7 @@ impl FilesUpload { verify_store: bool, ) -> (ChunkInfo, Result<()>) { let chunk_address = ChunkAddress::new(chunk_info.name); - let bytes = match tokio::fs::read(chunk_info.path.clone()).await { + let bytes = match std::fs::read(chunk_info.path.clone()) { Ok(bytes) => Bytes::from(bytes), Err(error) => { warn!("Chunk {chunk_address:?} could not be read from the system from {:?}. diff --git a/sn_client/src/lib.rs b/sn_client/src/lib.rs index 984f37b3c6..edc261c838 100644 --- a/sn_client/src/lib.rs +++ b/sn_client/src/lib.rs @@ -35,6 +35,71 @@ pub(crate) use error::Result; use sn_networking::Network; +#[cfg(target_arch = "wasm32")] +use console_error_panic_hook; +#[cfg(target_arch = "wasm32")] +use wasm_bindgen::prelude::*; +#[cfg(target_arch = "wasm32")] +use web_sys::console; + +// This is like the `main` function, except for JavaScript. +#[cfg(target_arch = "wasm32")] +#[wasm_bindgen(start)] +pub async fn main_js() -> std::result::Result<(), JsValue> { + // This provides better error messages in debug mode. + // It's disabled in release mode so it doesn't bloat up the file size. + // #[cfg(debug_assertions)] + console_error_panic_hook::set_once(); + + // Your code goes here! + console::log_1(&JsValue::from_str("Hello safe world!")); + + // Tracing + // TODO: dont log _everything_ + // right now it logs all libp2p entirely. + tracing_wasm::set_as_global_default(); + + Ok(()) +} + +/// A quick client that only takes some peers to connect to +#[wasm_bindgen] +#[cfg(target_arch = "wasm32")] +pub async fn get_data(peer: &str, data_address: &str) -> std::result::Result<(), JsError> { + let bytes = hex::decode(&data_address).expect("Input address is not a hex string"); + let xor_name = xor_name::XorName( + bytes + .try_into() + .expect("Failed to parse XorName from hex string"), + ); + + use sn_protocol::storage::ChunkAddress; + console::log_1(&JsValue::from_str(peer)); + + let the_peer = sn_peers_acquisition::parse_peer_addr(peer)?; + + console::log_1(&JsValue::from_str(&format!( + "Provided Peer was {the_peer:?}" + ))); + + // TODO: We need to tidy this up, the client loops forever in the browser, and eventually crashes + // it does _do things_ but errors surface, and even after getting data, it continues... + let client = Client::quick_start(Some(vec![the_peer])) + .await + .map_err(|e| JsError::new(&format!("Client could not start: {e:?}")))?; + + console::log_1(&JsValue::from_str("Client started {chunk:?}")); + + let chunk = client + .get_chunk(ChunkAddress::new(xor_name), false) + .await + .map_err(|e| JsError::new(&format!("Client get data failed: {e:?}")))?; + + console::log_1(&JsValue::from_str(&format!("Data found {chunk:?}"))); + + Ok(()) +} + /// Client API implementation to store and get data. #[derive(Clone)] pub struct Client { diff --git a/sn_client/src/wallet.rs b/sn_client/src/wallet.rs index dfa8fe57cb..3bcd8cff03 100644 --- a/sn_client/src/wallet.rs +++ b/sn_client/src/wallet.rs @@ -18,12 +18,16 @@ use sn_transfers::{ CashNote, DerivationIndex, LocalWallet, MainPubkey, NanoTokens, Payment, PaymentQuote, SignedSpend, SpendAddress, Transaction, Transfer, UniquePubkey, WalletError, WalletResult, }; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; use std::{ collections::{BTreeMap, BTreeSet}, iter::Iterator, - time::{Duration, Instant}, }; +use tokio::time::Duration; use tokio::{task::JoinSet, time::sleep}; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; use xor_name::XorName; /// A wallet client can be used to send and receive tokens to and from other wallets. diff --git a/sn_faucet/Cargo.toml b/sn_faucet/Cargo.toml index d4b7175adf..ac11d91d17 100644 --- a/sn_faucet/Cargo.toml +++ b/sn_faucet/Cargo.toml @@ -11,9 +11,7 @@ repository = "https://github.com/maidsafe/safe_network" version = "0.3.14" [features] -default = ["quic"] -# required to pass on flag to node builds -quic = ["sn_client/quic"] +default = [] [[bin]] path="src/main.rs" diff --git a/sn_networking/Cargo.toml b/sn_networking/Cargo.toml index efa49dfc3b..0466453322 100644 --- a/sn_networking/Cargo.toml +++ b/sn_networking/Cargo.toml @@ -11,10 +11,10 @@ repository = "https://github.com/maidsafe/safe_network" version = "0.12.37" [features] -default=["quic"] +default=["libp2p/quic"] local-discovery=["libp2p/mdns"] -quic=["libp2p/quic"] -tcp=["libp2p/tcp"] +# tcp is automatically enabled when compiling for wasm32 +websockets=["libp2p/tcp"] open-metrics=["libp2p/metrics", "prometheus-client", "hyper", "sysinfo"] [dependencies] @@ -24,7 +24,7 @@ futures = "~0.3.13" hyper = { version = "0.14", features = ["server", "tcp", "http1"], optional = true} itertools = "~0.11.0" custom_debug = "~0.5.0" -libp2p = { version="0.53", features = ["tokio", "dns", "kad", "macros", "request-response", "cbor","identify", "autonat", "noise", "tcp", "yamux", "gossipsub"] } +libp2p = { version="0.53", features = ["tokio", "dns", "kad", "macros", "request-response", "cbor","identify", "autonat", "noise", "tcp", "yamux", "gossipsub", "websocket"] } prometheus-client = { version = "0.22", optional = true } rand = { version = "~0.8.5", features = ["small_rng"] } rayon = "1.8.0" @@ -35,7 +35,7 @@ sn_transfers = { path = "../sn_transfers", version = "0.14.41" } sysinfo = { version = "0.29.0", default-features = false, optional = true } thiserror = "1.0.23" tiny-keccak = { version = "~2.0.2", features = [ "sha3" ] } -tokio = { version = "1.32.0", features = ["io-util", "macros", "parking_lot", "rt", "sync", "time"] } +tokio = { version = "1.32.0", features = ["io-util", "macros", "rt", "sync", "time"] } tracing = { version = "~0.1.26" } xor_name = "5.0.0" backoff = { version = "0.4.0", features = ["tokio"] } @@ -51,8 +51,12 @@ eyre = "0.6.8" workspace = true +# wasm build requirements +[lib] +crate-type = ["cdylib", "rlib"] [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2.12", features = ["js"] } -libp2p = { version="0.53", features = ["tokio", "dns", "kad", "macros", "request-response", "cbor","identify", "autonat", "noise", "tcp", "yamux", "gossipsub", "webtransport-websys"] } - +libp2p = { version="0.53", features = ["tokio", "dns", "kad", "tcp", "macros", "request-response", "cbor","identify", "autonat", "noise", "yamux", "gossipsub", "websocket-websys", "wasm-bindgen"] } +wasmtimer = "0.2.0" +wasm-bindgen-futures = "0.4.40" diff --git a/sn_networking/src/bootstrap.rs b/sn_networking/src/bootstrap.rs index bdfeee2073..d5dd18f368 100644 --- a/sn_networking/src/bootstrap.rs +++ b/sn_networking/src/bootstrap.rs @@ -7,8 +7,15 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{driver::PendingGetClosestType, SwarmDriver}; -use std::time::{Duration, Instant}; -use tokio::time::Interval; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; +use tokio::time::Duration; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::{interval, Interval}; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; +#[cfg(target_arch = "wasm32")] +use wasmtimer::tokio::{interval, Interval}; /// The interval in which kad.bootstrap is called pub(crate) const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5); @@ -128,7 +135,7 @@ impl ContinuousBootstrap { "It has been {LAST_PEER_ADDED_TIME_LIMIT:?} since we last added a peer to RT. Slowing down the continuous bootstrapping process" ); - let mut new_interval = tokio::time::interval(NO_PEER_ADDED_SLOWDOWN_INTERVAL); + let mut new_interval = interval(NO_PEER_ADDED_SLOWDOWN_INTERVAL); new_interval.tick().await; // the first tick completes immediately return (should_bootstrap, Some(new_interval)); } @@ -139,7 +146,7 @@ impl ContinuousBootstrap { let new_interval = BOOTSTRAP_INTERVAL * step; let new_interval = if new_interval > current_interval { info!("More peers have been added to our RT!. Slowing down the continuous bootstrapping process"); - let mut interval = tokio::time::interval(new_interval); + let mut interval = interval(new_interval); interval.tick().await; // the first tick completes immediately Some(interval) } else { diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 999ace3089..34999c571b 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -31,6 +31,11 @@ use std::{ use tokio::sync::oneshot; use xor_name::XorName; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; + /// Commands to send to the Swarm #[allow(clippy::large_enum_variant)] pub enum SwarmCmd { @@ -296,7 +301,7 @@ pub struct SwarmLocalState { impl SwarmDriver { pub(crate) fn handle_cmd(&mut self, cmd: SwarmCmd) -> Result<(), Error> { - let start = std::time::Instant::now(); + let start = Instant::now(); let mut cmd_string = ""; match cmd { SwarmCmd::TriggerIntervalReplication => { @@ -511,6 +516,7 @@ impl SwarmDriver { } SwarmCmd::Dial { addr, sender } => { cmd_string = "Dial"; + let mut addr_copy = addr.clone(); if let Some(peer_id) = multiaddr_pop_p2p(&mut addr_copy) { // Only consider the dial peer is bootstrap node when proper PeerId is provided. diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 82c2a4403b..c72c4e0f72 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -26,17 +26,21 @@ use crate::{ Network, CLOSE_GROUP_SIZE, }; use futures::StreamExt; -#[cfg(not(feature = "tcp"))] +#[cfg(all(not(feature = "websockets"), not(target_arch = "wasm32")))] use libp2p::core::muxing::StreamMuxerBox; #[cfg(feature = "local-discovery")] use libp2p::mdns; // default transports -#[cfg(all(not(feature = "tcp"), not(target_arch = "wasm32")))] +#[cfg(all(not(feature = "websockets"), not(target_arch = "wasm32")))] use libp2p::quic::{tokio::Transport as TokioTransport, Config as TransportConfig}; -#[cfg(feature = "tcp")] + +#[cfg(all(not(target_arch = "wasm32"), feature = "websockets"))] +use libp2p::websocket::WsConfig; + +#[cfg(all(not(target_arch = "wasm32"), feature = "websockets"))] use libp2p::tcp::{tokio::Transport as TokioTransport, Config as TransportConfig}; #[cfg(target_arch = "wasm32")] -use libp2p::webtransport_websys::{Config as TransportConfig, Transport as TokioTransport}; +use libp2p::websocket_websys::Transport as WebSocketTransport; use libp2p::{ autonat, identity::Keypair, @@ -62,12 +66,27 @@ use std::{ net::SocketAddr, num::NonZeroUsize, path::PathBuf, - time::{Duration, Instant}, }; use tiny_keccak::{Hasher, Sha3}; use tokio::sync::{mpsc, oneshot}; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::interval; +use tokio::time::Duration; +#[cfg(target_arch = "wasm32")] +use wasmtimer::tokio::interval; + +#[cfg(not(target_arch = "wasm32"))] +use tokio::task::spawn; +#[cfg(target_arch = "wasm32")] +use wasm_bindgen_futures::spawn_local as spawn; + use tracing::warn; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; + /// The ways in which the Get Closest queries are used. pub(crate) enum PendingGetClosestType { /// The network discovery method is present at the networking layer @@ -325,21 +344,19 @@ impl NetworkBuilder { let listen_socket_addr = listen_addr.ok_or(Error::ListenAddressNotProvided)?; // Flesh out the multiaddress - let listen_addr = Multiaddr::from(listen_socket_addr.ip()); + let start_addr = Multiaddr::from(listen_socket_addr.ip()); - #[cfg(feature = "quic")] - let listen_addr = listen_addr - .with(Protocol::Udp(listen_socket_addr.port())) - .with(Protocol::QuicV1); - - #[cfg(target_arch = "wasm32")] - let listen_addr = listen_addr - .with(Protocol::Udp(listen_socket_addr.port())) - .with(Protocol::WebTransport); - - #[cfg(feature = "tcp")] - Multiaddr::from(listen_socket_addr.ip()).with(Protocol::Tcp(listen_socket_addr.port())); + let listen_addr = if cfg!(any(feature = "websockets", target_arch = "wasm32")) { + start_addr + .with(Protocol::Tcp(listen_socket_addr.port())) + .with(Protocol::Ws("/".into())) + } else { + start_addr + .with(Protocol::Udp(listen_socket_addr.port())) + .with(Protocol::QuicV1) + }; + debug!("Attempting to listen on: {listen_addr:?}"); let _listener_id = swarm_driver .swarm .listen_on(listen_addr) @@ -386,7 +403,11 @@ impl NetworkBuilder { ) -> Result<(Network, mpsc::Receiver, SwarmDriver)> { let peer_id = PeerId::from(self.keypair.public()); // vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues): - info!("Node (PID: {}) with PeerId: {peer_id}", std::process::id()); + #[cfg(not(target_arch = "wasm32"))] + info!( + "Process (PID: {}) with PeerId: {peer_id}", + std::process::id() + ); info!( "Self PeerID {peer_id} is represented as kbucket_key {:?}", PrettyPrintKBucketKey(NetworkAddress::from_peer(peer_id).as_kbucket_key()) @@ -465,13 +486,29 @@ impl NetworkBuilder { libp2p::identify::Behaviour::new(cfg) }; - #[cfg(not(feature = "tcp"))] + // Default quic transport. + // cannot be built for wasm32 + #[cfg(all(not(feature = "websockets"), not(target_arch = "wasm32")))] let main_transport = TokioTransport::new(TransportConfig::new(&self.keypair)) .map(|(peer_id, muxer), _| (peer_id, StreamMuxerBox::new(muxer))) .boxed(); - #[cfg(feature = "tcp")] - let main_transport = TokioTransport::new(TransportConfig::default()) + #[cfg(target_arch = "wasm32")] + let main_transport = WebSocketTransport::default() + .upgrade(libp2p::core::upgrade::Version::V1) + .authenticate( + libp2p::noise::Config::new(&self.keypair) + .expect("Signing libp2p-noise static DH keypair failed."), + ) + .multiplex(libp2p::yamux::Config::default()) + .boxed(); + + #[cfg(all(not(target_arch = "wasm32"), feature = "websockets"))] + let tcp = TokioTransport::new(TransportConfig::default()); + + // tcp websocket transport for node builds + #[cfg(all(not(target_arch = "wasm32"), feature = "websockets"))] + let main_transport = WsConfig::new(tcp) .upgrade(libp2p::core::upgrade::Version::V1) .authenticate( libp2p::noise::Config::new(&self.keypair) @@ -553,20 +590,28 @@ impl NetworkBuilder { autonat, gossipsub, }; + + #[cfg(not(target_arch = "wasm32"))] let swarm_config = libp2p::swarm::Config::with_tokio_executor() .with_idle_connection_timeout(CONNECTION_KEEP_ALIVE_TIMEOUT); + #[cfg(target_arch = "wasm32")] + let swarm_config = libp2p::swarm::Config::with_wasm_executor() + .with_idle_connection_timeout(CONNECTION_KEEP_ALIVE_TIMEOUT); let swarm = Swarm::new(transport, behaviour, peer_id, swarm_config); + let bootstrap = ContinuousBootstrap::new(); + let replication_fetcher = ReplicationFetcher::new(peer_id); + let swarm_driver = SwarmDriver { swarm, self_peer_id: peer_id, local: self.local, is_client, connected_peers: 0, - bootstrap: ContinuousBootstrap::new(), + bootstrap, close_group: Default::default(), - replication_fetcher: ReplicationFetcher::new(peer_id), + replication_fetcher, #[cfg(feature = "open-metrics")] network_metrics, cmd_receiver: swarm_cmd_receiver, @@ -646,7 +691,7 @@ impl SwarmDriver { /// and command receiver messages, ensuring efficient handling of multiple /// asynchronous tasks. pub async fn run(mut self) { - let mut bootstrap_interval = tokio::time::interval(BOOTSTRAP_INTERVAL); + let mut bootstrap_interval = interval(BOOTSTRAP_INTERVAL); loop { tokio::select! { swarm_event = self.swarm.select_next_some() => { @@ -658,7 +703,7 @@ impl SwarmDriver { }, some_cmd = self.cmd_receiver.recv() => match some_cmd { Some(cmd) => { - let start = std::time::Instant::now(); + let start = Instant::now(); let cmd_string = format!("{cmd:?}"); if let Err(err) = self.handle_cmd(cmd) { warn!("Error while handling cmd: {err}"); @@ -688,7 +733,7 @@ impl SwarmDriver { let capacity = event_sender.capacity(); // push the event off thread so as to be non-blocking - let _handle = tokio::spawn(async move { + let _handle = spawn(async move { if capacity == 0 { warn!( "NetworkEvent channel is full. Await capacity to send: {:?}", diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index 8782b26afc..81d3075c2c 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -32,6 +32,11 @@ use libp2p::{ Multiaddr, PeerId, TransportError, }; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; + use sn_protocol::{ messages::{CmdResponse, Query, Request, Response}, storage::RecordType, @@ -40,9 +45,9 @@ use sn_protocol::{ use std::{ collections::{hash_map::Entry, HashSet}, fmt::{Debug, Formatter}, - time::{Duration, Instant}, }; use tokio::sync::oneshot; +use tokio::time::Duration; use tracing::{info, warn}; /// Our agent string has as a prefix that we can match against. @@ -199,7 +204,7 @@ impl SwarmDriver { // called individually on each behaviour. #[cfg(feature = "open-metrics")] self.network_metrics.record(&event); - let start = std::time::Instant::now(); + let start = Instant::now(); let event_string; match event { SwarmEvent::Behaviour(NodeEvent::MsgReceived(event)) => { @@ -718,7 +723,7 @@ impl SwarmDriver { fn handle_kad_event(&mut self, kad_event: kad::Event) -> Result<()> { #[cfg(feature = "open-metrics")] self.network_metrics.record(&kad_event); - let start = std::time::Instant::now(); + let start = Instant::now(); let event_string; match kad_event { diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 06733dcac3..ea0ad314bd 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -56,13 +56,21 @@ use sn_transfers::{MainPubkey, NanoTokens, PaymentQuote}; use std::{ collections::{BTreeMap, HashMap}, path::PathBuf, - time::Duration, }; use tokio::sync::{ mpsc::{self, Sender}, oneshot, }; - +#[cfg(not(target_arch = "wasm32"))] +use tokio::task::spawn; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::sleep; +use tokio::time::Duration; +use tracing::trace; +#[cfg(target_arch = "wasm32")] +use wasm_bindgen_futures::spawn_local as spawn; +#[cfg(target_arch = "wasm32")] +use wasmtimer::tokio::sleep; /// The type of quote for a selected payee. pub type PayeeQuote = (PeerId, MainPubkey, PaymentQuote); @@ -87,6 +95,8 @@ pub const fn close_group_majority() -> usize { /// Max duration for all GET attempts const MAX_GET_RETRY_DURATION_MS: u64 = 6800; + +#[cfg(not(target_arch = "wasm32"))] const MAX_GET_RETRY_DURATION: Duration = Duration::from_millis(MAX_GET_RETRY_DURATION_MS); /// Max duration for all PUT attempts const MAX_PUT_RETRY_DURATION: Duration = Duration::from_millis(MAX_GET_RETRY_DURATION_MS * 3); @@ -333,7 +343,7 @@ impl Network { } else { MIN_WAIT_BEFORE_READING_A_PUT + MIN_WAIT_BEFORE_READING_A_PUT }; - tokio::time::sleep(waiting_time).await; + sleep(waiting_time).await; } Err(Error::FailedToVerifyChunkProof(chunk_address.clone())) @@ -415,10 +425,36 @@ impl Network { Ok(()) } + /// Get a record from the network + /// This differs from non-wasm32 builds as no retries are applied + #[cfg(target_arch = "wasm32")] + pub async fn get_record_from_network( + &self, + key: RecordKey, + cfg: &GetRecordCfg, + ) -> Result { + let pretty_key = PrettyPrintRecordKey::from(&key); + info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",); + let (sender, receiver) = oneshot::channel(); + self.send_swarm_cmd(SwarmCmd::GetNetworkRecord { + key: key.clone(), + sender, + cfg: cfg.clone(), + })?; + let result = receiver.await.map_err(|e| { + error!("When fetching record {pretty_key:?}, encountered a channel error {e:?}"); + Error::InternalMsgChannelDropped + })?; + + result.map_err(Error::from) + } + /// Get the Record from the network /// Carry out re-attempts if required /// In case a target_record is provided, only return when fetched target. /// Otherwise count it as a failure when all attempts completed. + /// + #[cfg(not(target_arch = "wasm32"))] pub async fn get_record_from_network( &self, key: RecordKey, @@ -581,7 +617,7 @@ impl Network { .gen_range(MIN_WAIT_BEFORE_READING_A_PUT..MAX_WAIT_BEFORE_READING_A_PUT); // Small wait before we attempt to verify. // There will be `re-attempts` to be carried out within the later step anyway. - tokio::time::sleep(wait_duration).await; + sleep(wait_duration).await; debug!("Attempting to verify {pretty_key:?} after we've slept for {wait_duration:?}"); // Verify the record is stored, requiring re-attempts @@ -869,7 +905,7 @@ pub(crate) fn send_swarm_cmd(swarm_cmd_sender: Sender, cmd: SwarmCmd) } // Spawn a task to send the SwarmCmd and keep this fn sync - let _handle = tokio::spawn(async move { + let _handle = spawn(async move { if let Err(error) = swarm_cmd_sender.send(cmd).await { error!("Failed to send SwarmCmd: {}", error); } diff --git a/sn_networking/src/metrics.rs b/sn_networking/src/metrics.rs index 569002141c..96bf9c0722 100644 --- a/sn_networking/src/metrics.rs +++ b/sn_networking/src/metrics.rs @@ -8,8 +8,12 @@ use libp2p::metrics::{Metrics as Libp2pMetrics, Recorder}; use prometheus_client::{metrics::gauge::Gauge, registry::Registry}; -use std::time::Duration; use sysinfo::{Pid, PidExt, ProcessExt, ProcessRefreshKind, System, SystemExt}; +#[cfg(not(target_arch = "wasm32"))] +use tokio::time::sleep; +use tokio::time::Duration; +#[cfg(target_arch = "wasm32")] +use wasmtimer::tokio::sleep; const UPDATE_INTERVAL: Duration = Duration::from_secs(15); const TO_MB: u64 = 1_000_000; @@ -89,7 +93,7 @@ impl NetworkMetrics { let cpu_usage = process.cpu_usage() / core_count as f32; let _ = process_cpu_usage_percentage.set(cpu_usage as i64); } - tokio::time::sleep(UPDATE_INTERVAL).await; + sleep(UPDATE_INTERVAL).await; } }); } diff --git a/sn_networking/src/network_discovery.rs b/sn_networking/src/network_discovery.rs index 707dfeeef0..a5de038ec1 100644 --- a/sn_networking/src/network_discovery.rs +++ b/sn_networking/src/network_discovery.rs @@ -10,10 +10,11 @@ use libp2p::{kad::KBucketKey, PeerId}; use rand::{thread_rng, Rng}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use sn_protocol::NetworkAddress; -use std::{ - collections::{btree_map::Entry, BTreeMap}, - time::Instant, -}; +use std::collections::{btree_map::Entry, BTreeMap}; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; // The number of PeerId to generate when starting an instance of NetworkDiscovery const INITIAL_GENERATION_ATTEMPTS: usize = 10_000; diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index 13638ed6f4..adcbc7d2fe 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -22,6 +22,8 @@ use sn_protocol::{ NetworkAddress, PrettyPrintRecordKey, }; use sn_transfers::NanoTokens; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; use std::{ borrow::Cow, collections::{HashMap, HashSet}, @@ -30,6 +32,13 @@ use std::{ vec, }; use tokio::sync::mpsc; +#[cfg(not(target_arch = "wasm32"))] +use tokio::task::spawn; +#[cfg(target_arch = "wasm32")] +use wasm_bindgen_futures::spawn_local as spawn; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; + use xor_name::XorName; /// Max number of records a node can store @@ -117,7 +126,7 @@ impl NodeRecordStore { } fn read_from_disk<'a>(key: &Key, storage_dir: &Path) -> Option> { - let start = std::time::Instant::now(); + let start = Instant::now(); let filename = Self::key_to_hex(key); let file_path = storage_dir.join(&filename); @@ -252,7 +261,7 @@ impl NodeRecordStore { } let cloned_cmd_sender = self.swarm_cmd_sender.clone(); - tokio::spawn(async move { + spawn(async move { let cmd = match fs::write(&file_path, r.value) { Ok(_) => { // vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues): @@ -393,7 +402,7 @@ impl RecordStore for NodeRecordStore { trace!("Unverified Record {record_key:?} try to validate and store"); let event_sender = self.network_event_sender.clone(); // push the event off thread so as to be non-blocking - let _handle = tokio::spawn(async move { + let _handle = spawn(async move { if let Err(error) = event_sender .send(NetworkEvent::UnverifiedRecord(record)) .await @@ -415,7 +424,7 @@ impl RecordStore for NodeRecordStore { let filename = Self::key_to_hex(k); let file_path = self.config.storage_dir.join(&filename); - let _handle = tokio::spawn(async move { + let _handle = spawn(async move { match fs::remove_file(file_path) { Ok(_) => { info!("Removed record from disk! filename: {filename}"); @@ -539,8 +548,9 @@ mod tests { }; use quickcheck::*; use sn_protocol::storage::{try_serialize_record, ChunkAddress}; - use std::{collections::BTreeMap, time::Duration}; + use std::collections::BTreeMap; use tokio::runtime::Runtime; + use tokio::time::{sleep, Duration}; const MULITHASH_CODE: u64 = 0x12; @@ -681,7 +691,7 @@ mod tests { { break; } - tokio::time::sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; iteration += 1; } if iteration == max_iterations { @@ -774,7 +784,7 @@ mod tests { { break; } - tokio::time::sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; iteration += 1; } if iteration == max_iterations { @@ -790,7 +800,7 @@ mod tests { if store.get(&retained_key).is_some() { break; } - tokio::time::sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; iteration += 1; } if iteration == max_iterations { diff --git a/sn_networking/src/replication_fetcher.rs b/sn_networking/src/replication_fetcher.rs index 64feeae1fd..137fc94288 100644 --- a/sn_networking/src/replication_fetcher.rs +++ b/sn_networking/src/replication_fetcher.rs @@ -12,10 +12,14 @@ use libp2p::{ PeerId, }; use sn_protocol::{storage::RecordType, NetworkAddress, PrettyPrintRecordKey}; -use std::{ - collections::HashMap, - time::{Duration, Instant}, -}; +use std::collections::HashMap; + +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; +#[cfg(target_arch = "wasm32")] +use wasmtimer::std::Instant; + +use tokio::time::Duration; // Max parallel fetches that can be undertaken at the same time. const MAX_PARALLEL_FETCH: usize = K_VALUE.get(); @@ -206,7 +210,7 @@ mod tests { use libp2p::{kad::RecordKey, PeerId}; use sn_protocol::{storage::RecordType, NetworkAddress}; use std::{collections::HashMap, time::Duration}; - + use tokio::time::sleep; #[tokio::test] async fn verify_max_parallel_fetches() -> Result<()> { //random peer_id @@ -235,7 +239,7 @@ mod tests { ); assert!(keys_to_fetch.is_empty()); - tokio::time::sleep(FETCH_TIMEOUT + Duration::from_secs(1)).await; + sleep(FETCH_TIMEOUT + Duration::from_secs(1)).await; // all the previous fetches should have failed and fetching next batch let keys_to_fetch = replication_fetcher.next_keys_to_fetch(); diff --git a/sn_node/Cargo.toml b/sn_node/Cargo.toml index f38d1aa718..9d40b97bfe 100644 --- a/sn_node/Cargo.toml +++ b/sn_node/Cargo.toml @@ -14,13 +14,12 @@ name = "safenode" path = "src/bin/safenode/main.rs" [features] -default=["metrics", "quic"] +default=["metrics"] local-discovery=["sn_networking/local-discovery"] otlp = ["sn_logging/otlp"] metrics = ["sn_logging/process-metrics"] network-contacts = ["sn_peers_acquisition/network-contacts"] open-metrics = ["sn_networking/open-metrics", "prometheus-client"] -quic=["sn_networking/quic"] [dependencies] assert_fs = "1.0.0" diff --git a/sn_node/tests/common/client.rs b/sn_node/tests/common/client.rs index ecb1565eb7..0ed4458500 100644 --- a/sn_node/tests/common/client.rs +++ b/sn_node/tests/common/client.rs @@ -16,9 +16,9 @@ use sn_transfers::{create_faucet_wallet, LocalWallet, NanoTokens, Transfer}; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, path::Path, - time::{Duration, Instant}, }; use tokio::sync::Mutex; +use tokio::time::{Duration, Instant}; use tracing::{error, info, warn}; /// This is a limited hard coded value as Droplet version has to contact the faucet to get the funds. diff --git a/sn_node_rpc_client/Cargo.toml b/sn_node_rpc_client/Cargo.toml index 1c5f2dad77..1b6431aafe 100644 --- a/sn_node_rpc_client/Cargo.toml +++ b/sn_node_rpc_client/Cargo.toml @@ -33,7 +33,7 @@ thiserror = "1.0.23" # # watch out updating this, protoc compiler needs to be installed on all build systems # # arm builds + musl are very problematic tonic = { version = "0.6.2" } -tokio = { version = "1.32.0", features = ["parking_lot", "rt"] } +tokio = { version = "1.32.0", features = ["rt"] } tokio-stream = { version = "~0.1.12" } tracing = { version = "~0.1.26" } tracing-core = "0.1.30" diff --git a/sn_node_rpc_client/src/lib.rs b/sn_node_rpc_client/src/lib.rs index 2a72b9b975..f53838525e 100644 --- a/sn_node_rpc_client/src/lib.rs +++ b/sn_node_rpc_client/src/lib.rs @@ -12,7 +12,7 @@ use sn_protocol::safenode_proto::{ }; use std::path::PathBuf; use std::str::FromStr; -use std::time::Duration; +use tokio::time::Duration; use tonic::Request; #[derive(Debug, Clone)] diff --git a/sn_peers_acquisition/Cargo.toml b/sn_peers_acquisition/Cargo.toml index b2de0dd164..032440eff0 100644 --- a/sn_peers_acquisition/Cargo.toml +++ b/sn_peers_acquisition/Cargo.toml @@ -11,11 +11,9 @@ repository = "https://github.com/maidsafe/safe_network" version = "0.2.4" [features] -default = ["quic"] local-discovery = [] network-contacts = ["reqwest", "tokio", "url"] -quic= [] -tcp = [] +websockets = [] [dependencies] clap = { version = "4.2.1", features = ["derive", "env"] } @@ -23,7 +21,7 @@ libp2p = { version="0.53", features = [] } rand = "0.8.5" reqwest = { version="0.11.18", default-features=false, features = ["rustls-tls"], optional = true } thiserror = "1.0.23" -tokio = { version = "1.32.0", optional = true} +tokio = { version = "1.32.0", optional = true, default-features = false} tracing = { version = "~0.1.26" } url = { version = "2.4.0", optional = true } diff --git a/sn_peers_acquisition/src/lib.rs b/sn_peers_acquisition/src/lib.rs index 70f2452cf2..b3b5c0d987 100644 --- a/sn_peers_acquisition/src/lib.rs +++ b/sn_peers_acquisition/src/lib.rs @@ -124,25 +124,24 @@ async fn get_network_contacts(args: &PeersArgs) -> Result> { pub fn parse_peer_addr(addr: &str) -> Result { // Parse valid IPv4 socket address, e.g. `1.2.3.4:1234`. if let Ok(addr) = addr.parse::() { - let multiaddr = Multiaddr::from(*addr.ip()); - #[cfg(feature = "tcp")] - // Turn the address into a `/ip4//tcp/` multiaddr. - let multiaddr = multiaddr.with(Protocol::Tcp(addr.port())); - #[cfg(feature = "quic")] - // Turn the address into a `/ip4//udp//quic-v1` multiaddr. - let multiaddr = multiaddr + let start_addr = Multiaddr::from(*addr.ip()); + // Start with an address into a `/ip4//udp//quic-v1` multiaddr. + let multiaddr = start_addr .with(Protocol::Udp(addr.port())) .with(Protocol::QuicV1); - #[cfg(target_arch = "wasm32")] - // Turn the address into a `/ip4//udp//webtransport-websys-v1` multiaddr. - let multiaddr = multiaddr - .with(Protocol::Udp(addr.port())) - .with(Protocol::WebTransport); + + #[cfg(all(feature = "websockets", feature = "wasm32"))] + // Turn the address into a `/ip4//udp//websocket-websys-v1` multiaddr. + let multiaddr = start_addr + .with(Protocol::Tcp(addr.port())) + .with(Protocol::Ws("/".into())); + return Ok(multiaddr); } // Parse any valid multiaddr string if let Ok(addr) = addr.parse::() { + debug!("Parsing a full multiaddr: {:?}", addr); return Ok(addr); } diff --git a/sn_protocol/Cargo.toml b/sn_protocol/Cargo.toml index accbd5f01d..4ae3b0a36a 100644 --- a/sn_protocol/Cargo.toml +++ b/sn_protocol/Cargo.toml @@ -10,10 +10,9 @@ repository = "https://github.com/maidsafe/safe_network" version = "0.11.0" [features] -default = ["quic"] +default = [] test-utils=[] -quic=[] -tcp=[] +websockets=[] rpc=["tonic", "prost"] [dependencies] @@ -48,11 +47,3 @@ tonic-build = { version = "~0.6.2" } [lints] workspace = true - - - -# wasm compilation -[lib] -crate-type = ["cdylib", "rlib"] -[target.'cfg(target_arch = "wasm32")'.dependencies] -getrandom = { version = "0.2.12", features = ["js"] } diff --git a/sn_protocol/src/node_registry.rs b/sn_protocol/src/node_registry.rs index 4b58c97ea8..c35b15d8a7 100644 --- a/sn_protocol/src/node_registry.rs +++ b/sn_protocol/src/node_registry.rs @@ -113,14 +113,17 @@ pub struct Node { impl Node { pub fn get_multiaddr(&self) -> Option { if let Some(peer_id) = self.peer_id { - let addr = Multiaddr::from(std::net::Ipv4Addr::LOCALHOST); - - #[cfg(feature = "tcp")] - let addr = addr.with(libp2p::multiaddr::Protocol::Tcp(self.port)); - #[cfg(feature = "quic")] - let addr = addr - .with(libp2p::multiaddr::Protocol::Udp(self.port)) - .with(libp2p::multiaddr::Protocol::QuicV1); + let start_addr = Multiaddr::from(std::net::Ipv4Addr::LOCALHOST); + // default + let addr = if cfg!(any(feature = "websockets", target_arch = "wasm32")) { + start_addr + .with(libp2p::multiaddr::Protocol::Tcp(self.port)) + .with(libp2p::multiaddr::Protocol::Ws("/".into())) + } else { + start_addr + .with(libp2p::multiaddr::Protocol::Udp(self.port)) + .with(libp2p::multiaddr::Protocol::QuicV1) + }; let peer = addr.with(libp2p::multiaddr::Protocol::P2p(peer_id)); diff --git a/sn_testnet/Cargo.toml b/sn_testnet/Cargo.toml index 5e3b3f7bf1..4089f2e2c1 100644 --- a/sn_testnet/Cargo.toml +++ b/sn_testnet/Cargo.toml @@ -12,12 +12,12 @@ version = "0.3.33" [features] # required to pass on flag to node builds -default = ["quic"] +default = [] chaos = [] statemap = [] otlp = [] local-discovery = [] -quic = [] +websockets = [] network-contacts = [] open-metrics = [] @@ -45,7 +45,7 @@ libp2p-identity = { version="0.2.7", features = ["rand"] } [dependencies.tokio] version = "1.17.0" -features = ["fs", "io-util", "macros", "rt", "rt-multi-thread", "sync"] +features = ["io-util", "macros", "rt", "sync"] [build-dependencies] # watch out updating this, protoc compiler needs to be installed on all build systems diff --git a/sn_testnet/src/check_testnet.rs b/sn_testnet/src/check_testnet.rs index 0d0192cc21..d9202ba336 100644 --- a/sn_testnet/src/check_testnet.rs +++ b/sn_testnet/src/check_testnet.rs @@ -187,7 +187,10 @@ pub async fn obtain_peer_id(address: SocketAddr) -> Result { // Parse node logs files and extract info for each of them fn nodes_info_from_logs(path: &Path) -> Result> { let mut nodes = BTreeMap::::new(); - let re = Regex::new(r"Node \(PID: (\d+)\) with PeerId: (.*)")?; + + // If this regex misses the pid defaults to 0 for all nodes, so the count + // will be off + let re = Regex::new(r"Process \(PID: (\d+)\) with PeerId: (.*)")?; let re_listener = Regex::new("Local node is listening on \"(.+)\"")?; diff --git a/sn_testnet/src/lib.rs b/sn_testnet/src/lib.rs index 19ab732f2d..1a8bf07261 100644 --- a/sn_testnet/src/lib.rs +++ b/sn_testnet/src/lib.rs @@ -243,11 +243,13 @@ impl Testnet { std::thread::sleep(std::time::Duration::from_millis(self.node_launch_interval)); let peer_id = self.rpc_client.obtain_peer_id(rpc_address).await?; - #[cfg(not(feature = "quic"))] - let genesis_multi_addr = format!("/ip4/127.0.0.1/tcp/{genesis_port:?}/p2p/{peer_id}"); - #[cfg(feature = "quic")] - let genesis_multi_addr = - format!("/ip4/127.0.0.1/udp/{genesis_port:?}/quic-v1/p2p/{peer_id}"); + + let genesis_multi_addr = if cfg!(any(feature = "websockets", target_arch = "wasm32")) { + format!("/ip4/127.0.0.1/tcp/{genesis_port:?}/ws/p2p/{peer_id}") + } else { + format!("/ip4/127.0.0.1/udp/{genesis_port:?}/quic-v1/p2p/{peer_id}") + }; + Ok(genesis_multi_addr) } @@ -465,8 +467,11 @@ mod test { .launch_genesis(vec!["--log-format".to_string(), "json".to_string()]) .await?; - if !cfg!(feature = "quic") { - assert_eq!(format!("/ip4/127.0.0.1/tcp/11101/p2p/{peer_id}"), multiaddr); + if cfg!(feature = "websockets") { + assert_eq!( + format!("/ip4/127.0.0.1/tcp/11101/ws/p2p/{peer_id}"), + multiaddr + ); } else { assert_eq!( format!("/ip4/127.0.0.1/udp/11101/quic-v1/p2p/{peer_id}"), @@ -539,8 +544,11 @@ mod test { .launch_genesis(vec!["--log-format".to_string(), "json".to_string()]) .await?; - if !cfg!(feature = "quic") { - assert_eq!(format!("/ip4/127.0.0.1/tcp/11101/p2p/{peer_id}"), multiaddr); + if !cfg!(feature = "websockets") { + assert_eq!( + format!("/ip4/127.0.0.1/tcp/11101/ws/p2p/{peer_id}"), + multiaddr + ); } else { assert_eq!( format!("/ip4/127.0.0.1/udp/11101/quic-v1/p2p/{peer_id}"), diff --git a/sn_testnet/src/main.rs b/sn_testnet/src/main.rs index 3bf134fb2d..2147ca2c54 100644 --- a/sn_testnet/src/main.rs +++ b/sn_testnet/src/main.rs @@ -275,8 +275,8 @@ fn build_binaries(binaries_to_build: &[String]) -> Result<()> { if cfg!(feature = "network-contacts") { args.extend(["--features", "network-contacts"]); } - if cfg!(feature = "quic") { - args.extend(["--features", "quic"]); + if cfg!(feature = "websockets") { + args.extend(["--features", "websockets"]); } if cfg!(feature = "open-metrics") { args.extend(["--features", "open-metrics"]); diff --git a/sn_transfers/Cargo.toml b/sn_transfers/Cargo.toml index 6b678268e8..84e621761a 100644 --- a/sn_transfers/Cargo.toml +++ b/sn_transfers/Cargo.toml @@ -14,7 +14,6 @@ version = "0.14.41" bls = { package = "blsttc", version = "8.0.1" } custom_debug = "~0.5.0" dirs-next = "~2.0.0" -fs2 = "0.4.3" hex = "~0.4.3" lazy_static = "~1.4.0" rand = { version = "~0.8.5", features = ["small_rng"] } @@ -33,6 +32,10 @@ criterion = "0.4.0" assert_fs = "1.0.0" eyre = "0.6.8" + +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +fs2 = "0.4.3" + [target."cfg(unix)".dev-dependencies.pprof] version = "0.11.0" features = [ "flamegraph" ]