Skip to content

Commit

Permalink
Fix event generation to listen in multiple nodes (#1268)
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez authored Oct 18, 2024
1 parent 6e49211 commit cdd1dfa
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 22 deletions.
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "freenet"
version = "0.0.7"
edition = "2021"
rust-version = "1.71.1"
rust-version = "1.80"
publish = true
description = "Freenet core software"
license = "MIT OR Apache-2.0"
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,11 @@ pub(crate) mod test {
async move {
loop {
let message = {
let mut lock = ws_client_clone.lock().await;
let mut lock = ws_client_clone.try_lock().inspect_err(|_| {
tracing::error!(peer = %self.id, "failed to lock ws client");
}).inspect(|_| {
tracing::debug!(peer = %self.id, "locked ws client");
}).unwrap();
lock.next().await
};

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,8 @@ impl ConfigArgs {
if self.config_paths.data_dir.is_none() {
self.config_paths.data_dir = Some(data);
}
Self::read_config(&config)?.map(|cfg| {
Self::read_config(&config)?.inspect(|_| {
tracing::info!("Found configuration file in default directory");
cfg
})
};

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,8 @@ impl ExecutorToEventLoopChannel<ExecutorHalve> {
{
let op = message.initiate_op(&self.op_manager);
let tx = *op.id();
self.end.waiting_for_op_tx.send(tx).await.map_err(|e| {
self.end.waiting_for_op_tx.send(tx).await.inspect_err(|_| {
tracing::debug!("failed to send request to executor, channel closed");
e
})?;
<T as ComposeNetworkMessage<Op>>::resume_op(op, &self.op_manager)
.await
Expand Down
2 changes: 0 additions & 2 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1786,8 +1786,6 @@ mod tests {
};

let peer_inbound = async {
let mut conn_count = 0;

let event =
tokio::time::timeout(Duration::from_secs(1), handler.wait_for_events()).await??;
let _conn = match event {
Expand Down
15 changes: 14 additions & 1 deletion crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use either::Either;
use freenet_stdlib::prelude::*;
use futures::Future;
use rand::seq::SliceRandom;
use tokio::sync::{mpsc, watch};
use tokio::sync::{broadcast, mpsc, watch};
use tracing::{info, Instrument};

#[cfg(feature = "trace-ot")]
Expand Down Expand Up @@ -205,6 +205,19 @@ impl EventSender for watch::Sender<(EventId, TransportPublicKey)> {
}
}

impl EventSender for broadcast::Sender<(EventId, TransportPublicKey)> {
fn send(
&self,
_cx: &mut std::task::Context<'_>,
value: (EventId, TransportPublicKey),
) -> std::task::Poll<Result<(), ()>> {
match self.send(value) {
Ok(_) => std::task::Poll::Ready(Ok(())),
Err(_) => std::task::Poll::Ready(Err(())),
}
}
}

impl<S: EventSender> futures::stream::Stream for EventChain<S> {
type Item = EventId;

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/tracing/aof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,8 @@ impl LogFile {
let deserialized_records = tokio::task::spawn_blocking(move || {
let mut filtered = vec![];
for buf in records {
let record: NetLogMessage = bincode::deserialize(&buf).map_err(|e| {
let record: NetLogMessage = bincode::deserialize(&buf).inspect_err(|_| {
tracing::error!(?buf, "deserialization error");
e
})?;
// tracing::info!(?record);
if let EventKind::Route(outcome) = record.kind {
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/transport/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1639,9 +1639,8 @@ mod test {
}
let mut test_no = 0;
while let Some(result) = tests.next().await {
result?.map_err(|e| {
result?.inspect_err(|_| {
tracing::error!(%test_no, "error in test");
e
})?;
test_no += 1;
}
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/wasm_runtime/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,10 @@ fn compact_index_file<S: StoreFsManagement>(key_file_path: &Path) -> std::io::Re

// Read the original file and compact data into the temp file
let mut original_reader = BufReader::new(original_file);
let mut temp_writer = SafeWriter::<S>::new(&temp_file_path, true).map_err(|e| {
let mut temp_writer = SafeWriter::<S>::new(&temp_file_path, true).inspect_err(|_| {
if let Err(e) = fs::remove_file(&lock_file_path) {
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
}
e
})?;

let mut any_deleted = false; // Track if any deleted records were found
Expand Down
13 changes: 6 additions & 7 deletions crates/fdev/src/testing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ async fn config_handler(
}

async fn ws_handler(ws: WebSocketUpgrade, supervisor: Arc<Supervisor>) -> axum::response::Response {
tracing::info!("WebSocket connection received");
let on_upgrade = move |ws: WebSocket| async move {
let cloned_supervisor = supervisor.clone();
if let Err(error) = handle_socket(ws, cloned_supervisor).await {
Expand Down Expand Up @@ -370,8 +371,8 @@ async fn handle_outgoing_messages(
supervisor: &Arc<Supervisor>,
sender: &mut SplitSink<WebSocket, Message>,
) -> anyhow::Result<()> {
let mut event_rx = supervisor.event_rx.lock().await;
while let Some((event, peer_id)) = event_rx.recv().await {
let mut event_rx = supervisor.user_ev_controller.lock().await.subscribe();
while let Ok((event, peer_id)) = event_rx.recv().await {
tracing::info!("Sending event {} to peer {}", event, peer_id);
let serialized_msg: Vec<u8> = bincode::serialize(&(event, peer_id.clone()))
.map_err(|e| anyhow!("Failed to serialize message: {}", e))?;
Expand Down Expand Up @@ -462,23 +463,21 @@ pub struct Supervisor {
processes: Mutex<HashMap<TransportPublicKey, SubProcess>>,
waiting_peers: Arc<Mutex<VecDeque<usize>>>,
waiting_gateways: Arc<Mutex<VecDeque<usize>>>,
user_ev_controller: Arc<Mutex<tokio::sync::mpsc::Sender<(u32, TransportPublicKey)>>>,
event_rx: Arc<Mutex<tokio::sync::mpsc::Receiver<(u32, TransportPublicKey)>>>,
user_ev_controller: Mutex<tokio::sync::broadcast::Sender<(u32, TransportPublicKey)>>,
}

impl Supervisor {
pub async fn new(network: &mut SimNetwork) -> Self {
let peers = network.build_peers();
let peers_config = Arc::new(Mutex::new(peers.into_iter().collect::<HashMap<_, _>>()));
let (user_ev_controller, event_rx) = tokio::sync::mpsc::channel(1);
let (user_ev_controller, _) = tokio::sync::broadcast::channel(1);

Supervisor {
peers_config,
processes: Mutex::new(HashMap::new()),
waiting_peers: Arc::new(Mutex::new(VecDeque::new())),
waiting_gateways: Arc::new(Mutex::new(VecDeque::new())),
user_ev_controller: Arc::new(Mutex::new(user_ev_controller)),
event_rx: Arc::new(Mutex::new(event_rx)),
user_ev_controller: Mutex::new(user_ev_controller),
}
}
async fn start_process(
Expand Down

0 comments on commit cdd1dfa

Please sign in to comment.