From 6d6d42f1e29dae039efa212f6147912c1594928b Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Fri, 18 Oct 2024 14:01:42 +0200 Subject: [PATCH 1/2] Fix event generation to listen in multiple nodes --- crates/core/Cargo.toml | 2 +- crates/core/src/client_events.rs | 6 +++++- crates/core/src/config.rs | 3 +-- crates/core/src/contract/executor.rs | 3 +-- crates/core/src/node/testing_impl.rs | 15 ++++++++++++++- crates/fdev/src/testing/network.rs | 13 ++++++------- 6 files changed, 28 insertions(+), 14 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index c337bda89..bf95ee7c6 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -2,7 +2,7 @@ name = "freenet" version = "0.0.7" edition = "2021" -rust-version = "1.71.1" +rust-version = "1.80" publish = true description = "Freenet core software" license = "MIT OR Apache-2.0" diff --git a/crates/core/src/client_events.rs b/crates/core/src/client_events.rs index 9b302189f..674995f01 100644 --- a/crates/core/src/client_events.rs +++ b/crates/core/src/client_events.rs @@ -360,7 +360,11 @@ pub(crate) mod test { async move { loop { let message = { - let mut lock = ws_client_clone.lock().await; + let mut lock = ws_client_clone.try_lock().inspect_err(|_| { + tracing::error!(peer = %self.id, "failed to lock ws client"); + }).inspect(|_| { + tracing::debug!(peer = %self.id, "locked ws client"); + }).unwrap(); lock.next().await }; diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index 2fd5872a8..7f90315e5 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -190,9 +190,8 @@ impl ConfigArgs { if self.config_paths.data_dir.is_none() { self.config_paths.data_dir = Some(data); } - Self::read_config(&config)?.map(|cfg| { + Self::read_config(&config)?.inspect(|_| { tracing::info!("Found configuration file in default directory"); - cfg }) }; diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index 37214b4e0..36109e38c 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -231,9 +231,8 @@ impl ExecutorToEventLoopChannel { { let op = message.initiate_op(&self.op_manager); let tx = *op.id(); - self.end.waiting_for_op_tx.send(tx).await.map_err(|e| { + self.end.waiting_for_op_tx.send(tx).await.inspect_err(|_| { tracing::debug!("failed to send request to executor, channel closed"); - e })?; >::resume_op(op, &self.op_manager) .await diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index fa9a342d2..be8c9ad81 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -11,7 +11,7 @@ use either::Either; use freenet_stdlib::prelude::*; use futures::Future; use rand::seq::SliceRandom; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{broadcast, mpsc, watch}; use tracing::{info, Instrument}; #[cfg(feature = "trace-ot")] @@ -205,6 +205,19 @@ impl EventSender for watch::Sender<(EventId, TransportPublicKey)> { } } +impl EventSender for broadcast::Sender<(EventId, TransportPublicKey)> { + fn send( + &self, + _cx: &mut std::task::Context<'_>, + value: (EventId, TransportPublicKey), + ) -> std::task::Poll> { + match self.send(value) { + Ok(_) => std::task::Poll::Ready(Ok(())), + Err(_) => std::task::Poll::Ready(Err(())), + } + } +} + impl futures::stream::Stream for EventChain { type Item = EventId; diff --git a/crates/fdev/src/testing/network.rs b/crates/fdev/src/testing/network.rs index 8cf6b985c..ab4645d7f 100644 --- a/crates/fdev/src/testing/network.rs +++ b/crates/fdev/src/testing/network.rs @@ -308,6 +308,7 @@ async fn config_handler( } async fn ws_handler(ws: WebSocketUpgrade, supervisor: Arc) -> axum::response::Response { + tracing::info!("WebSocket connection received"); let on_upgrade = move |ws: WebSocket| async move { let cloned_supervisor = supervisor.clone(); if let Err(error) = handle_socket(ws, cloned_supervisor).await { @@ -370,8 +371,8 @@ async fn handle_outgoing_messages( supervisor: &Arc, sender: &mut SplitSink, ) -> anyhow::Result<()> { - let mut event_rx = supervisor.event_rx.lock().await; - while let Some((event, peer_id)) = event_rx.recv().await { + let mut event_rx = supervisor.user_ev_controller.lock().await.subscribe(); + while let Ok((event, peer_id)) = event_rx.recv().await { tracing::info!("Sending event {} to peer {}", event, peer_id); let serialized_msg: Vec = bincode::serialize(&(event, peer_id.clone())) .map_err(|e| anyhow!("Failed to serialize message: {}", e))?; @@ -462,23 +463,21 @@ pub struct Supervisor { processes: Mutex>, waiting_peers: Arc>>, waiting_gateways: Arc>>, - user_ev_controller: Arc>>, - event_rx: Arc>>, + user_ev_controller: Mutex>, } impl Supervisor { pub async fn new(network: &mut SimNetwork) -> Self { let peers = network.build_peers(); let peers_config = Arc::new(Mutex::new(peers.into_iter().collect::>())); - let (user_ev_controller, event_rx) = tokio::sync::mpsc::channel(1); + let (user_ev_controller, _) = tokio::sync::broadcast::channel(1); Supervisor { peers_config, processes: Mutex::new(HashMap::new()), waiting_peers: Arc::new(Mutex::new(VecDeque::new())), waiting_gateways: Arc::new(Mutex::new(VecDeque::new())), - user_ev_controller: Arc::new(Mutex::new(user_ev_controller)), - event_rx: Arc::new(Mutex::new(event_rx)), + user_ev_controller: Mutex::new(user_ev_controller), } } async fn start_process( From 8b20b2c3befa6c682f6486c680340c520774bc07 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Fri, 18 Oct 2024 15:21:52 +0200 Subject: [PATCH 2/2] clippy --- crates/core/src/node/network_bridge/handshake.rs | 2 -- crates/core/src/tracing/aof.rs | 3 +-- crates/core/src/transport/connection_handler.rs | 3 +-- crates/core/src/wasm_runtime/store.rs | 3 +-- 4 files changed, 3 insertions(+), 8 deletions(-) diff --git a/crates/core/src/node/network_bridge/handshake.rs b/crates/core/src/node/network_bridge/handshake.rs index f8ab6eb5c..70a11e347 100644 --- a/crates/core/src/node/network_bridge/handshake.rs +++ b/crates/core/src/node/network_bridge/handshake.rs @@ -1786,8 +1786,6 @@ mod tests { }; let peer_inbound = async { - let mut conn_count = 0; - let event = tokio::time::timeout(Duration::from_secs(1), handler.wait_for_events()).await??; let _conn = match event { diff --git a/crates/core/src/tracing/aof.rs b/crates/core/src/tracing/aof.rs index b4f6c4e0a..b23647895 100644 --- a/crates/core/src/tracing/aof.rs +++ b/crates/core/src/tracing/aof.rs @@ -339,9 +339,8 @@ impl LogFile { let deserialized_records = tokio::task::spawn_blocking(move || { let mut filtered = vec![]; for buf in records { - let record: NetLogMessage = bincode::deserialize(&buf).map_err(|e| { + let record: NetLogMessage = bincode::deserialize(&buf).inspect_err(|_| { tracing::error!(?buf, "deserialization error"); - e })?; // tracing::info!(?record); if let EventKind::Route(outcome) = record.kind { diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index e190513c6..458d6cecc 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -1639,9 +1639,8 @@ mod test { } let mut test_no = 0; while let Some(result) = tests.next().await { - result?.map_err(|e| { + result?.inspect_err(|_| { tracing::error!(%test_no, "error in test"); - e })?; test_no += 1; } diff --git a/crates/core/src/wasm_runtime/store.rs b/crates/core/src/wasm_runtime/store.rs index 8b4ac21d0..4d161b29f 100644 --- a/crates/core/src/wasm_runtime/store.rs +++ b/crates/core/src/wasm_runtime/store.rs @@ -323,11 +323,10 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re // Read the original file and compact data into the temp file let mut original_reader = BufReader::new(original_file); - let mut temp_writer = SafeWriter::::new(&temp_file_path, true).map_err(|e| { + let mut temp_writer = SafeWriter::::new(&temp_file_path, true).inspect_err(|_| { if let Err(e) = fs::remove_file(&lock_file_path) { eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } - e })?; let mut any_deleted = false; // Track if any deleted records were found