diff --git a/executor/src/light_client.rs b/executor/src/light_client.rs index 505e2142..ab08ddcc 100644 --- a/executor/src/light_client.rs +++ b/executor/src/light_client.rs @@ -27,9 +27,7 @@ export interface JsLightClientCallback { startTimer: (delay: number) => void connect: (connectionId: number, address: string, cert: Uint8Array) => void resetConnection: (connectionId: number) => void - connectionStreamOpen: (connectionId: number) => void - connectionStreamReset: (connectionId: number, streamId: number) => void - streamSend: (connectionId: number, data: Uint8Array) => void + messageSend: (connectionId: number, data: Uint8Array) => void queryResponse: (requestId: number, response: Response) => void } @@ -84,14 +82,8 @@ extern "C" { #[wasm_bindgen(structural, method, js_name = "connect")] pub fn connect(this: &JsLightClientCallback, conn_id: u32, address: String, cert: Vec); - #[wasm_bindgen(structural, method, js_name = "connectionStreamOpen")] - pub fn connection_stream_open(this: &JsLightClientCallback, conn_id: u32); - - #[wasm_bindgen(structural, method, js_name = "connectionStreamReset")] - pub fn connection_stream_reset(this: &JsLightClientCallback, conn_id: u32, stream_id: u32); - - #[wasm_bindgen(structural, method, js_name = "streamSend")] - pub fn stream_send(this: &JsLightClientCallback, conn_id: u32, data: Vec); + #[wasm_bindgen(structural, method, js_name = "messageSend")] + pub fn message_send(this: &JsLightClientCallback, conn_id: u32, data: Vec); #[wasm_bindgen(structural, method, js_name = "resetConnection")] pub fn reset_connection(this: &JsLightClientCallback, conn_id: u32); @@ -406,7 +398,7 @@ pub fn query_chain( .body .unwrap_or_default() .into_iter() - .map(|x| HexString(x)) + .map(HexString) .collect(), }) .collect::>(); @@ -445,33 +437,23 @@ pub fn query_chain( } #[wasm_bindgen] -pub fn stream_message(connection_id: u32, stream_id: u32, data: Vec) { - crate::platform::stream_message(connection_id, stream_id, data); -} - -#[wasm_bindgen] -pub fn stream_writable_bytes(connection_id: u32, stream_id: u32, num_bytes: u32) { - crate::platform::stream_writable_bytes(connection_id, stream_id, num_bytes); -} - -#[wasm_bindgen] -pub fn connection_reset(connection_id: u32, data: Vec) { - crate::platform::connection_reset(connection_id, data); +pub fn message_received(connection_id: u32, data: Vec) { + crate::platform::message_received(connection_id, data); } #[wasm_bindgen] -pub fn stream_reset(connection_id: u32, stream_id: u32, data: Vec) { - crate::platform::stream_reset(connection_id, stream_id, data); +pub fn connection_writable_bytes(connection_id: u32, num_bytes: u32) { + crate::platform::connection_writable_bytes(connection_id, num_bytes); } #[wasm_bindgen] -pub fn timer_finished(callback: JsLightClientCallback) { - crate::timers::timer_finished(Arc::new(callback)); +pub fn connection_reset(connection_id: u32) { + crate::platform::connection_reset(connection_id); } #[wasm_bindgen] -pub fn connection_stream_opened(connection_id: u32, stream_id: u32, outbound: u32) { - crate::platform::connection_stream_opened(connection_id, stream_id, outbound); +pub fn wake_up(callback: JsLightClientCallback) { + crate::timers::wake_up(Arc::new(callback)); } #[wasm_bindgen] @@ -484,7 +466,7 @@ pub fn peers_list(chain_id: usize) -> Result { .map(|(peer_id, role, best_number, best_hash)| { ( peer_id.to_string(), - format!("{:?}", role), + format!("{role:?}"), best_number, best_hash, ) diff --git a/executor/src/platform.rs b/executor/src/platform.rs index ccfbbbfb..e8f14a68 100644 --- a/executor/src/platform.rs +++ b/executor/src/platform.rs @@ -4,10 +4,7 @@ use smoldot_light::platform::{read_write, SubstreamDirection}; use std::{ borrow::Cow, collections::{BTreeMap, VecDeque}, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, }; use crate::{ @@ -15,13 +12,6 @@ use crate::{ timers::Delay, }; -/// Total number of bytes that all the connections created through [`PlatformRef`] combined have -/// received. -pub static TOTAL_BYTES_RECEIVED: AtomicU64 = AtomicU64::new(0); -/// Total number of bytes that all the connections created through [`PlatformRef`] combined have -/// sent. -pub static TOTAL_BYTES_SENT: AtomicU64 = AtomicU64::new(0); - #[derive(Debug, Clone)] pub(crate) struct JsPlatform { pub callback: Arc, @@ -110,10 +100,10 @@ impl smoldot_light::platform::PlatformRef for JsPlatform { fn log<'a>( &self, - log_level: smoldot_light::platform::LogLevel, - log_target: &'a str, - message: &'a str, - key_values: impl Iterator, + _log_level: smoldot_light::platform::LogLevel, + _log_target: &'a str, + _message: &'a str, + _key_values: impl Iterator, ) { // TODO: } @@ -193,7 +183,7 @@ impl smoldot_light::platform::PlatformRef for JsPlatform { debug_assert!(_prev_value.is_none()); let _prev_value = lock.streams.insert( - (connection_id, None), + connection_id, Stream { reset: None, messages_queue: VecDeque::with_capacity(8), @@ -206,7 +196,6 @@ impl smoldot_light::platform::PlatformRef for JsPlatform { future::ready(StreamWrapper { connection_id, - stream_id: None, read_buffer: Vec::new(), inner_expected_incoming_bytes: Some(1), is_reset: None, @@ -220,178 +209,23 @@ impl smoldot_light::platform::PlatformRef for JsPlatform { fn connect_multistream( &self, - address: smoldot_light::platform::MultiStreamAddress, + _address: smoldot_light::platform::MultiStreamAddress, ) -> Self::MultiStreamConnectFuture { - let mut lock = STATE.try_lock().unwrap(); - - let connection_id = lock.next_connection_id; - lock.next_connection_id += 1; - - let mut cert = vec![]; - - let encoded_address: String = match address { - smoldot_light::platform::MultiStreamAddress::WebRtc { - ip: core::net::IpAddr::V4(ip), - port, - remote_certificate_sha256, - } => { - cert.copy_from_slice(remote_certificate_sha256); - format!("webrtc://{ip}:{port}").into() - } - smoldot_light::platform::MultiStreamAddress::WebRtc { - ip: core::net::IpAddr::V6(ip), - port, - remote_certificate_sha256, - } => { - cert.copy_from_slice(remote_certificate_sha256); - format!("webrtc://[{ip}]:{port}").into() - } - }; - - self.callback.connect(connection_id, encoded_address, cert); - - let _prev_value = lock.connections.insert( - connection_id, - Connection { - inner: ConnectionInner::MultiStreamUnknownHandshake { - opened_substreams_to_pick_up: VecDeque::with_capacity(0), - connection_handles_alive: 1, - }, - something_happened: event_listener::Event::new(), - }, - ); - debug_assert!(_prev_value.is_none()); - - let js_callback = self.callback.clone(); - - Box::pin(async move { - // Wait until the connection state is no longer "unknown handshake". - let mut lock = loop { - let something_happened = { - let mut lock = STATE.try_lock().unwrap(); - let connection = lock.connections.get_mut(&connection_id).unwrap(); - - if matches!( - connection.inner, - ConnectionInner::Reset { .. } | ConnectionInner::MultiStreamWebRtc { .. } - ) { - break lock; - } - - connection.something_happened.listen() - }; - - something_happened.await - }; - let lock = &mut *lock; - - let connection = lock.connections.get_mut(&connection_id).unwrap(); - - match &mut connection.inner { - ConnectionInner::SingleStreamMsNoiseYamux { .. } - | ConnectionInner::MultiStreamUnknownHandshake { .. } => { - unreachable!() - } - ConnectionInner::MultiStreamWebRtc { - local_tls_certificate_sha256, - .. - } => smoldot_light::platform::MultiStreamWebRtcConnection { - connection: MultiStreamWrapper(connection_id, js_callback), - local_tls_certificate_sha256: *local_tls_certificate_sha256, - }, - ConnectionInner::Reset { .. } => { - // If the connection was already reset, we proceed anyway but provide a fake - // certificate hash. This has absolutely no consequence. - smoldot_light::platform::MultiStreamWebRtcConnection { - connection: MultiStreamWrapper(connection_id, js_callback), - local_tls_certificate_sha256: [0; 32], - } - } - } - }) + unimplemented!() } fn open_out_substream( &self, - MultiStreamWrapper(connection_id, _callback): &mut Self::MultiStream, + MultiStreamWrapper(_connection_id, _callback): &mut Self::MultiStream, ) { - match STATE - .try_lock() - .unwrap() - .connections - .get(connection_id) - .unwrap() - .inner - { - ConnectionInner::MultiStreamWebRtc { .. } - | ConnectionInner::MultiStreamUnknownHandshake { .. } => { - self.callback.connection_stream_open(*connection_id); - } - ConnectionInner::Reset { .. } => {} - ConnectionInner::SingleStreamMsNoiseYamux { .. } => { - unreachable!() - } - } + unimplemented!() } fn next_substream<'a>( &self, - MultiStreamWrapper(connection_id, _callback): &'a mut Self::MultiStream, + MultiStreamWrapper(_connection_id, _callback): &'a mut Self::MultiStream, ) -> Self::NextSubstreamFuture<'a> { - let connection_id = *connection_id; - let callback = self.callback.clone(); - Box::pin(async move { - let (stream_id, direction) = loop { - let something_happened = { - let mut lock = STATE.try_lock().unwrap(); - let connection = lock.connections.get_mut(&connection_id).unwrap(); - - match &mut connection.inner { - ConnectionInner::Reset { .. } => return None, - ConnectionInner::MultiStreamWebRtc { - opened_substreams_to_pick_up, - connection_handles_alive, - .. - } - | ConnectionInner::MultiStreamUnknownHandshake { - opened_substreams_to_pick_up, - connection_handles_alive, - .. - } => { - if let Some((substream, direction)) = - opened_substreams_to_pick_up.pop_front() - { - *connection_handles_alive += 1; - break (substream, direction); - } - } - ConnectionInner::SingleStreamMsNoiseYamux { .. } => { - unreachable!() - } - } - - connection.something_happened.listen() - }; - - something_happened.await - }; - - Some(( - StreamWrapper { - connection_id, - stream_id: Some(stream_id), - read_buffer: Vec::new(), - inner_expected_incoming_bytes: Some(1), - is_reset: None, - writable_bytes: 0, - write_closable: false, // Note: this is currently hardcoded for WebRTC. - write_closed: false, - when_wake_up: None, - callback, - }, - direction, - )) - }) + unimplemented!() } fn read_write_access<'a>( @@ -439,10 +273,7 @@ impl smoldot_light::platform::PlatformRef for JsPlatform { loop { let listener = { let mut lock = STATE.try_lock().unwrap(); - let stream_inner = lock - .streams - .get_mut(&(stream.connection_id, stream.stream_id)) - .unwrap(); + let stream_inner = lock.streams.get_mut(&stream.connection_id).unwrap(); if let Some(msg) = &stream_inner.reset { stream.is_reset = Some(msg.clone()); @@ -533,10 +364,7 @@ impl<'a> Drop for ReadWriteAccess<'a> { fn drop(&mut self) { let mut lock = STATE.try_lock().unwrap(); - let stream_inner = lock - .streams - .get_mut(&(self.stream.connection_id, self.stream.stream_id)) - .unwrap(); + let stream_inner = lock.streams.get_mut(&self.stream.connection_id).unwrap(); if (self.read_write.read_bytes != 0 && self @@ -564,22 +392,14 @@ impl<'a> Drop for ReadWriteAccess<'a> { assert!(buffer.len() <= self.stream.writable_bytes); self.stream.writable_bytes -= buffer.len(); - // `unwrap()` is ok as there's no way that `buffer.len()` doesn't fit in a `u64`. - TOTAL_BYTES_SENT.fetch_add(u64::try_from(buffer.len()).unwrap(), Ordering::Relaxed); - if stream_inner.reset.is_none() { self.stream .callback - .stream_send(self.stream.connection_id, buffer); + .message_send(self.stream.connection_id, buffer); } } if self.read_write.write_bytes_queueable.is_none() && !self.stream.write_closed { - if stream_inner.reset.is_none() && self.stream.write_closable { - // TODO: we don't support multiple streams yet - // self.stream.callback.close_stream(self.stream.connection_id, self.stream.stream_id); - } - self.stream.write_closed = true; } } @@ -587,7 +407,6 @@ impl<'a> Drop for ReadWriteAccess<'a> { pub struct StreamWrapper { pub connection_id: u32, - pub stream_id: Option, pub read_buffer: Vec, pub inner_expected_incoming_bytes: Option, /// `Some` if the remote has reset the stream and `update_stream` has since then been called. @@ -608,53 +427,7 @@ impl Drop for StreamWrapper { fn drop(&mut self) { let mut lock = STATE.try_lock().unwrap(); let lock = &mut *lock; - - let connection = lock.connections.get_mut(&self.connection_id).unwrap(); - let removed_stream = lock - .streams - .remove(&(self.connection_id, self.stream_id)) - .unwrap(); - - let remove_connection = match &mut connection.inner { - ConnectionInner::SingleStreamMsNoiseYamux { .. } => { - if removed_stream.reset.is_none() { - self.callback.reset_connection(self.connection_id); - } - - debug_assert!(self.stream_id.is_none()); - true - } - ConnectionInner::MultiStreamWebRtc { - connection_handles_alive, - .. - } - | ConnectionInner::MultiStreamUnknownHandshake { - connection_handles_alive, - .. - } => { - if removed_stream.reset.is_none() { - self.callback - .connection_stream_reset(self.connection_id, self.stream_id.unwrap()); - } - *connection_handles_alive -= 1; - let remove_connection = *connection_handles_alive == 0; - if remove_connection { - self.callback.reset_connection(self.connection_id) - } - remove_connection - } - ConnectionInner::Reset { - connection_handles_alive, - .. - } => { - *connection_handles_alive -= 1; - *connection_handles_alive == 0 - } - }; - - if remove_connection { - lock.connections.remove(&self.connection_id).unwrap(); - } + lock.connections.remove(&self.connection_id).unwrap(); } } @@ -669,19 +442,7 @@ impl Drop for MultiStreamWrapper { ConnectionInner::SingleStreamMsNoiseYamux { .. } => { unreachable!() } - ConnectionInner::MultiStreamWebRtc { - connection_handles_alive, - .. - } - | ConnectionInner::MultiStreamUnknownHandshake { - connection_handles_alive, - .. - } => { - *connection_handles_alive -= 1; - let v = *connection_handles_alive == 0; - (v, v) - } - ConnectionInner::Reset { .. } => (true, false), + ConnectionInner::Reset => (true, false), }; if remove_connection { @@ -714,15 +475,11 @@ impl core::hash::BuildHasher for FnvBuildHasher { } } -/// All the connections and streams that are alive. -/// -/// Single-stream connections have one entry in `connections` and one entry in `streams` (with -/// a `stream_id` always equal to `None`). -/// Multi-stream connections have one entry in `connections` and zero or more entries in `streams`. +/// All the connections that are alive. struct NetworkState { next_connection_id: u32, connections: hashbrown::HashMap, - streams: BTreeMap<(u32, Option), Stream>, + streams: BTreeMap, } #[derive(Debug)] @@ -736,35 +493,8 @@ struct Connection { #[derive(Debug)] enum ConnectionInner { SingleStreamMsNoiseYamux, - MultiStreamUnknownHandshake { - /// List of substreams that the host (i.e. JavaScript side) has reported have been opened, - /// but that haven't been reported through - /// [`smoldot_light::platform::PlatformRef::next_substream`] yet. - opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>, - /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference - /// this connection. If it switches from 1 to 0, the connection must be removed. - connection_handles_alive: u32, - }, - MultiStreamWebRtc { - /// List of substreams that the host (i.e. JavaScript side) has reported have been opened, - /// but that haven't been reported through - /// [`smoldot_light::platform::PlatformRef::next_substream`] yet. - opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>, - /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference - /// this connection. If it switches from 1 to 0, the connection must be removed. - connection_handles_alive: u32, - /// SHA256 hash of the TLS certificate used by the local node at the DTLS layer. - local_tls_certificate_sha256: [u8; 32], - }, /// [`bindings::connection_reset`] has been called - Reset { - /// Message given by the bindings to justify the closure. - // TODO: why is this unused? shouldn't it be not unused? - _message: String, - /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference - /// this connection. If it switches from 1 to 0, the connection must be removed. - connection_handles_alive: u32, - }, + Reset, } struct Stream { @@ -783,23 +513,13 @@ struct Stream { something_happened: event_listener::Event, } -pub(crate) fn stream_writable_bytes(connection_id: u32, stream_id: u32, bytes: u32) { +pub(crate) fn connection_writable_bytes(connection_id: u32, bytes: u32) { let mut lock = STATE.try_lock().unwrap(); - let connection = lock.connections.get_mut(&connection_id).unwrap(); - - // For single stream connections, the docs of this function mentions that `stream_id` can be - // any value. - let actual_stream_id = match connection.inner { - ConnectionInner::MultiStreamWebRtc { .. } - | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id), - ConnectionInner::SingleStreamMsNoiseYamux { .. } => None, - ConnectionInner::Reset { .. } => unreachable!(), - }; + if lock.connections.get_mut(&connection_id).is_none() { + return; + } - let stream = lock - .streams - .get_mut(&(connection_id, actual_stream_id)) - .unwrap(); + let stream = lock.streams.get_mut(&connection_id).unwrap(); debug_assert!(stream.reset.is_none()); // As documented, the number of writable bytes must never become exceedingly large (a few @@ -808,36 +528,19 @@ pub(crate) fn stream_writable_bytes(connection_id: u32, stream_id: u32, bytes: u stream.something_happened.notify(usize::MAX); } -pub fn stream_message(connection_id: u32, stream_id: u32, message: Vec) { +pub fn message_received(connection_id: u32, message: Vec) { + if message.is_empty() { + return; + } let mut lock = STATE.try_lock().unwrap(); - let connection = lock.connections.get_mut(&connection_id); - if connection.is_none() { + // ensure connection is active + if lock.connections.get_mut(&connection_id).is_none() { return; } - let connection = connection.unwrap(); - - // For single stream connections, the docs of this function mentions that `stream_id` can be - // any value. - let actual_stream_id = match connection.inner { - ConnectionInner::MultiStreamWebRtc { .. } - | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id), - ConnectionInner::SingleStreamMsNoiseYamux { .. } => None, - ConnectionInner::Reset { .. } => unreachable!(), - }; - let stream = lock - .streams - .get_mut(&(connection_id, actual_stream_id)) - .unwrap(); + let stream = lock.streams.get_mut(&connection_id).unwrap(); debug_assert!(stream.reset.is_none()); - TOTAL_BYTES_RECEIVED.fetch_add(u64::try_from(message.len()).unwrap(), Ordering::Relaxed); - - // Ignore empty message to avoid all sorts of problems. - if message.is_empty() { - return; - } - // There is unfortunately no way to instruct the browser to back-pressure connections to // remotes. // @@ -870,11 +573,7 @@ pub fn stream_message(connection_id: u32, stream_id: u32, message: Vec) { stream.something_happened.notify(usize::MAX); } -pub fn connection_reset(connection_id: u32, message: Vec) { - let message = str::from_utf8(&message) - .unwrap_or_else(|_| panic!("non-UTF-8 message")) - .to_owned(); - +pub fn connection_reset(connection_id: u32) { let mut lock = STATE.try_lock().unwrap(); let connection = lock.connections.get_mut(&connection_id); let connection = match connection { @@ -882,91 +581,12 @@ pub fn connection_reset(connection_id: u32, message: Vec) { None => return, }; - let connection_handles_alive = match &connection.inner { - ConnectionInner::SingleStreamMsNoiseYamux { .. } => 1, // TODO: I believe that this is correct but a bit confusing; might be helpful to refactor with an enum or something - ConnectionInner::MultiStreamWebRtc { - connection_handles_alive, - .. - } - | ConnectionInner::MultiStreamUnknownHandshake { - connection_handles_alive, - .. - } => *connection_handles_alive, - ConnectionInner::Reset { .. } => unreachable!(), - }; - - connection.inner = ConnectionInner::Reset { - connection_handles_alive, - _message: message.clone(), - }; + connection.inner = ConnectionInner::Reset; connection.something_happened.notify(usize::MAX); - for ((_, _), stream) in lock - .streams - .range_mut((connection_id, Some(u32::MAX))..=(connection_id, Some(u32::MAX))) - { - stream.reset = Some(message.clone()); - stream.something_happened.notify(usize::MAX); - } - if let Some(stream) = lock.streams.get_mut(&(connection_id, None)) { - stream.reset = Some(message); + if let Some(stream) = lock.streams.get_mut(&connection_id) { + stream.reset = Some("connection reset".into()); stream.something_happened.notify(usize::MAX); } } - -pub(crate) fn connection_stream_opened(connection_id: u32, stream_id: u32, outbound: u32) { - let mut lock = STATE.try_lock().unwrap(); - let lock = &mut *lock; - - let connection = lock.connections.get_mut(&connection_id).unwrap(); - if let ConnectionInner::MultiStreamWebRtc { - opened_substreams_to_pick_up, - .. - } = &mut connection.inner - { - let _prev_value = lock.streams.insert( - (connection_id, Some(stream_id)), - Stream { - reset: None, - messages_queue: VecDeque::with_capacity(8), - messages_queue_total_size: 0, - something_happened: event_listener::Event::new(), - writable_bytes_extra: 0, - }, - ); - - if _prev_value.is_some() { - panic!("same stream_id used multiple times in connection_stream_opened") - } - - opened_substreams_to_pick_up.push_back(( - stream_id, - if outbound != 0 { - SubstreamDirection::Outbound - } else { - SubstreamDirection::Inbound - }, - )); - - connection.something_happened.notify(usize::MAX); - } else { - panic!() - } -} - -pub fn stream_reset(connection_id: u32, stream_id: u32, message: Vec) { - let message: String = str::from_utf8(&message) - .unwrap_or_else(|_| panic!("non-UTF-8 message")) - .to_owned(); - - // Note that, as documented, it is illegal to call this function on single-stream substreams. - // We can thus assume that the `stream_id` is valid. - let mut lock = STATE.try_lock().unwrap(); - let stream = lock - .streams - .get_mut(&(connection_id, Some(stream_id))) - .unwrap(); - stream.reset = Some(message); - stream.something_happened.notify(usize::MAX); -} diff --git a/executor/src/proof.rs b/executor/src/proof.rs index 175879a4..42766e69 100644 --- a/executor/src/proof.rs +++ b/executor/src/proof.rs @@ -261,10 +261,7 @@ fn create_proof_works() { let (hash, nodes) = create_proof(get_nodes(), updates).unwrap(); let decoded = decode_proof(hash, nodes.iter().map(|x| x.0.clone()).collect::>()).unwrap(); - assert!(decoded - .iter() - .find(|(key, _)| key == &dmq_mqc_head) - .is_none()); + assert!(!decoded.iter().any(|(key, _)| key == &dmq_mqc_head)); // current_slot is not changed let (_, value) = decoded diff --git a/executor/src/timers.rs b/executor/src/timers.rs index 7474b808..f5cbc00a 100644 --- a/executor/src/timers.rs +++ b/executor/src/timers.rs @@ -1,27 +1,3 @@ -// Smoldot -// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! This module provides the `Delay` struct, which implement `Future` and becomes ready after a -//! certain time. -//! -//! In order to optimize performances, we avoid invoking the FFI once per timer. Instead, the FFI -//! is only used in order to wake up when the earliest timer finishes, then restarted for the next -//! timer. - use core::{ cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd}, future, mem, @@ -34,7 +10,7 @@ use std::{collections::BTreeSet, sync::Mutex}; use crate::light_client::{monotonic_clock_us, JsLightClientCallback}; -pub(crate) fn timer_finished(js_callback: Arc) { +pub(crate) fn wake_up(js_callback: Arc) { process_timers(js_callback); } diff --git a/packages/core/src/wasm-executor/browser-wasm-executor.js b/packages/core/src/wasm-executor/browser-wasm-executor.js index 9f4f9928..c84df974 100644 --- a/packages/core/src/wasm-executor/browser-wasm-executor.js +++ b/packages/core/src/wasm-executor/browser-wasm-executor.js @@ -30,28 +30,20 @@ const startNetworkService = async (config, callback) => { return pkg.start_network_service(config, callback) } -const connectionStreamOpened = async (connectionId, streamId, outbound) => { - return pkg.connection_stream_opened(connectionId, streamId, outbound) +const connectionReset = async (connectionId) => { + return pkg.connection_reset(connectionId) } -const connectionReset = async (connectionId, data) => { - return pkg.connection_reset(connectionId, data) +const messageRecieved = async (connectionId, data) => { + return pkg.message_received(connectionId, data) } -const streamReset = async (connectionId, streamId) => { - return pkg.stream_reset(connectionId, streamId) +const connectionWritableBytes = async (connectionId, bytes) => { + return pkg.connection_writable_bytes(connectionId, bytes) } -const streamMessage = async (connectionId, streamId, data) => { - return pkg.stream_message(connectionId, streamId, data) -} - -const streamWritableBytes = async (connectionId, streamId, bytes) => { - return pkg.stream_writable_bytes(connectionId, streamId, bytes) -} - -const timerFinished = async (callback) => { - return pkg.timer_finished(callback) +const wakeUp = async (callback) => { + return pkg.wake_up(callback) } const queryChain = async (chainId, requestId, request, retries, callback) => { @@ -77,12 +69,10 @@ const wasmExecutor = { queryChain, getPeers, getLatestBlock, - connectionStreamOpened, connectionReset, - streamReset, - streamMessage, - streamWritableBytes, - timerFinished, + messageRecieved, + connectionWritableBytes, + wakeUp, } Comlink.expose(wasmExecutor) diff --git a/packages/core/src/wasm-executor/index.ts b/packages/core/src/wasm-executor/index.ts index 1c3e8201..b531590b 100644 --- a/packages/core/src/wasm-executor/index.ts +++ b/packages/core/src/wasm-executor/index.ts @@ -70,12 +70,10 @@ export interface WasmExecutor { startNetworkService: (config: LightClientConfig, callback: JsLightClientCallback) => Promise getPeers: (chainId: number) => Promise<[string, string, number, string][]> getLatestBlock: (chainId: number) => Promise<[number, HexString]> - streamMessage: (connectionId: number, streamId: number, data: Uint8Array) => Promise - streamWritableBytes: (connectionId: number, streamId: number, numBytes: number) => Promise - connectionStreamOpened: (connectionId: number, streamId: number, outbound: number) => Promise - connectionReset: (connectionId: number, data: Uint8Array) => Promise - streamReset: (connectionId: number, streamId: number) => Promise - timerFinished: (callback: JsLightClientCallback) => Promise + messageRecieved: (connectionId: number, data: Uint8Array) => Promise + connectionWritableBytes: (connectionId: number, numBytes: number) => Promise + connectionReset: (connectionId: number) => Promise + wakeUp: (callback: JsLightClientCallback) => Promise queryChain: ( chainId: number, requestId: number, @@ -263,48 +261,24 @@ export const getPeers = async (chainId: number) => { .catch(() => []) } -export const streamMessage = async (connectionId: number, streamId: number, data: Uint8Array) => { +export const messageRecieved = async (connectionId: number, data: Uint8Array) => { const worker = await getWorker() - return worker.remote.streamMessage(connectionId, streamId, data) + return worker.remote.messageRecieved(connectionId, data) } -export const streamWritableBytes = async (connectionId: number, streamId: number, numBytes: number) => { +export const connectionWritableBytes = async (connectionId: number, numBytes: number) => { const worker = await getWorker() - return worker.remote.streamWritableBytes(connectionId, streamId, numBytes) + return worker.remote.connectionWritableBytes(connectionId, numBytes) } -export const connectionStreamOpened = async (connectionId: number, streamId: number, outbound: number) => { +export const connectionReset = async (connectionId: number) => { const worker = await getWorker() - return worker.remote.connectionStreamOpened(connectionId, streamId, outbound) + return worker.remote.connectionReset(connectionId) } -export const connectionReset = async (connectionId: number, data: Uint8Array) => { +export const wakeUp = async (callback: JsLightClientCallback) => { const worker = await getWorker() - return worker.remote.connectionReset(connectionId, data) -} - -export const streamReset = async (connectionId: number, streamId: number) => { - const worker = await getWorker() - return worker.remote.streamReset(connectionId, streamId) -} - -export const timerFinished = async (callback: JsLightClientCallback) => { - const worker = await getWorker() - return worker.remote.timerFinished(Comlink.proxy(callback)) -} - -export const connectionOpenSingleStream = async (_connectionId: number, _streamId: number) => { - // const worker = await getWorker() - // return worker.remote.connectionOpenSingleStream(connectionId, streamId) -} - -export const connectionOpenMultiStream = async ( - _connectionId: number, - _localCert: Uint8Array, - _remoteCert: Uint8Array, -) => { - // const worker = await getWorker() - // return worker.remote.connectionOpenMultiStream(connectionId, localCert, remoteCert) + return worker.remote.wakeUp(Comlink.proxy(callback)) } export const getLatestBlock = async (chainId: number) => { diff --git a/packages/core/src/wasm-executor/light-client.ts b/packages/core/src/wasm-executor/light-client.ts index 77e92239..09fcd0e8 100644 --- a/packages/core/src/wasm-executor/light-client.ts +++ b/packages/core/src/wasm-executor/light-client.ts @@ -1,20 +1,19 @@ import { HexString } from '@polkadot/util/types' -import { Response } from '@acala-network/chopsticks-executor' +import { JsLightClientCallback, Response } from '@acala-network/chopsticks-executor' import { WebSocket } from 'ws' -import { stringToU8a } from '@polkadot/util' globalThis.WebSocket = typeof globalThis.WebSocket !== 'undefined' ? globalThis.WebSocket : (WebSocket as any) import { Deferred, defer } from '../utils/index.js' import { connectionReset, + connectionWritableBytes, getLatestBlock, getPeers, + messageRecieved, queryChain, startNetworkService, - streamMessage, - streamWritableBytes, - timerFinished, + wakeUp, } from './index.js' import { defaultLogger } from '../logger.js' @@ -79,7 +78,7 @@ export type LightClientConfig = { bootnodes: string[] } -export class LightClient { +export class LightClient implements JsLightClientCallback { #requestId = 1 // blacklist of addresses that we have failed to connect to #blacklist: string[] = [] @@ -105,7 +104,7 @@ export class LightClient { connect(connId: number, address: string, _cert: Uint8Array) { if (this.#blacklist.includes(address)) { - connectionReset(connId, new Uint8Array(0)) + connectionReset(connId) return } @@ -126,26 +125,26 @@ export class LightClient { if (!connection.destroyed) { connection.destroyed = true self.#connections.delete(connId) - connectionReset(connId, stringToU8a(error['message'] || '')) + connectionReset(connId) } } connection.onMessage = function (ws, data) { if (connection.destroyed) return if (ws.readyState != 1) return - streamMessage(connId, 0, data) + messageRecieved(connId, data) } - connection.onClose = function (_ws, event) { + connection.onClose = function () { if (!connection.destroyed) { connection.destroyed = true self.#connections.delete(connId) - connectionReset(connId, stringToU8a(event.reason)) + connectionReset(connId) } } connection.onOpen = function () { - streamWritableBytes(connId, 0, 1024 * 1024) + connectionWritableBytes(connId, 1024 * 1024) } } @@ -154,7 +153,7 @@ export class LightClient { this.#queryResponse.delete(requestId) } - streamSend(connectionId: number, data: Uint8Array) { + messageSend(connectionId: number, data: Uint8Array) { const connection = this.#connections.get(connectionId) if (!connection) { this.resetConnection(connectionId) @@ -188,7 +187,7 @@ export class LightClient { if (ms == 0 && typeof setImmediate === 'function') { setImmediate(() => { try { - timerFinished(this) + wakeUp(this) } catch (_e) { _e } @@ -196,7 +195,7 @@ export class LightClient { } else { setTimeout(() => { try { - timerFinished(this) + wakeUp(this) } catch (_e) { _e } @@ -260,11 +259,6 @@ export class LightClient { throw new Error('Invalid response') } - // TODO: webrtc - connectionStreamOpen(_connectionId: number) {} - // TODO: webrtc - connectionStreamReset(_connectionId: number, _streamId: number) {} - async getPeers() { const chainId = await this.#chainId.promise return getPeers(chainId) diff --git a/packages/core/src/wasm-executor/node-wasm-executor.js b/packages/core/src/wasm-executor/node-wasm-executor.js index 264acb22..b9043ceb 100644 --- a/packages/core/src/wasm-executor/node-wasm-executor.js +++ b/packages/core/src/wasm-executor/node-wasm-executor.js @@ -33,28 +33,20 @@ const startNetworkService = async (config, callback) => { return pkg.start_network_service(config, callback) } -const connectionStreamOpened = async (connectionId, streamId, outbound) => { - return pkg.connection_stream_opened(connectionId, streamId, outbound) +const connectionReset = async (connectionId) => { + return pkg.connection_reset(connectionId) } -const connectionReset = async (connectionId, data) => { - return pkg.connection_reset(connectionId, data) +const messageRecieved = async (connectionId, data) => { + return pkg.message_received(connectionId, data) } -const streamReset = async (connectionId, streamId) => { - return pkg.stream_reset(connectionId, streamId) +const connectionWritableBytes = async (connectionId, bytes) => { + return pkg.connection_writable_bytes(connectionId, bytes) } -const streamMessage = async (connectionId, streamId, data) => { - return pkg.stream_message(connectionId, streamId, data) -} - -const streamWritableBytes = async (connectionId, streamId, bytes) => { - return pkg.stream_writable_bytes(connectionId, streamId, bytes) -} - -const timerFinished = async (callback) => { - return pkg.timer_finished(callback) +const wakeUp = async (callback) => { + return pkg.wake_up(callback) } const queryChain = async (chainId, requestId, request, retries, callback) => { @@ -80,12 +72,10 @@ const wasmExecutor = { queryChain, getPeers, getLatestBlock, - connectionStreamOpened, connectionReset, - streamReset, - streamMessage, - streamWritableBytes, - timerFinished, + messageRecieved, + connectionWritableBytes, + wakeUp, } Comlink.expose(wasmExecutor, nodeEndpoint(parentPort))