Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix event generation to listen in multiple nodes #1268

Merged
merged 2 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "freenet"
version = "0.0.7"
edition = "2021"
rust-version = "1.71.1"
rust-version = "1.80"
publish = true
description = "Freenet core software"
license = "MIT OR Apache-2.0"
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,11 @@ pub(crate) mod test {
async move {
loop {
let message = {
let mut lock = ws_client_clone.lock().await;
let mut lock = ws_client_clone.try_lock().inspect_err(|_| {
tracing::error!(peer = %self.id, "failed to lock ws client");
}).inspect(|_| {
tracing::debug!(peer = %self.id, "locked ws client");
}).unwrap();
lock.next().await
};

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,8 @@ impl ConfigArgs {
if self.config_paths.data_dir.is_none() {
self.config_paths.data_dir = Some(data);
}
Self::read_config(&config)?.map(|cfg| {
Self::read_config(&config)?.inspect(|_| {
tracing::info!("Found configuration file in default directory");
cfg
})
};

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,8 @@ impl ExecutorToEventLoopChannel<ExecutorHalve> {
{
let op = message.initiate_op(&self.op_manager);
let tx = *op.id();
self.end.waiting_for_op_tx.send(tx).await.map_err(|e| {
self.end.waiting_for_op_tx.send(tx).await.inspect_err(|_| {
tracing::debug!("failed to send request to executor, channel closed");
e
})?;
<T as ComposeNetworkMessage<Op>>::resume_op(op, &self.op_manager)
.await
Expand Down
2 changes: 0 additions & 2 deletions crates/core/src/node/network_bridge/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1786,8 +1786,6 @@ mod tests {
};

let peer_inbound = async {
let mut conn_count = 0;

let event =
tokio::time::timeout(Duration::from_secs(1), handler.wait_for_events()).await??;
let _conn = match event {
Expand Down
15 changes: 14 additions & 1 deletion crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use either::Either;
use freenet_stdlib::prelude::*;
use futures::Future;
use rand::seq::SliceRandom;
use tokio::sync::{mpsc, watch};
use tokio::sync::{broadcast, mpsc, watch};
use tracing::{info, Instrument};

#[cfg(feature = "trace-ot")]
Expand Down Expand Up @@ -205,6 +205,19 @@ impl EventSender for watch::Sender<(EventId, TransportPublicKey)> {
}
}

impl EventSender for broadcast::Sender<(EventId, TransportPublicKey)> {
fn send(
&self,
_cx: &mut std::task::Context<'_>,
value: (EventId, TransportPublicKey),
) -> std::task::Poll<Result<(), ()>> {
match self.send(value) {
Ok(_) => std::task::Poll::Ready(Ok(())),
Err(_) => std::task::Poll::Ready(Err(())),
}
}
}

impl<S: EventSender> futures::stream::Stream for EventChain<S> {
type Item = EventId;

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/tracing/aof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,8 @@ impl LogFile {
let deserialized_records = tokio::task::spawn_blocking(move || {
let mut filtered = vec![];
for buf in records {
let record: NetLogMessage = bincode::deserialize(&buf).map_err(|e| {
let record: NetLogMessage = bincode::deserialize(&buf).inspect_err(|_| {
tracing::error!(?buf, "deserialization error");
e
})?;
// tracing::info!(?record);
if let EventKind::Route(outcome) = record.kind {
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/transport/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1639,9 +1639,8 @@ mod test {
}
let mut test_no = 0;
while let Some(result) = tests.next().await {
result?.map_err(|e| {
result?.inspect_err(|_| {
tracing::error!(%test_no, "error in test");
e
})?;
test_no += 1;
}
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/wasm_runtime/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,10 @@ fn compact_index_file<S: StoreFsManagement>(key_file_path: &Path) -> std::io::Re

// Read the original file and compact data into the temp file
let mut original_reader = BufReader::new(original_file);
let mut temp_writer = SafeWriter::<S>::new(&temp_file_path, true).map_err(|e| {
let mut temp_writer = SafeWriter::<S>::new(&temp_file_path, true).inspect_err(|_| {
if let Err(e) = fs::remove_file(&lock_file_path) {
eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!());
}
e
})?;

let mut any_deleted = false; // Track if any deleted records were found
Expand Down
13 changes: 6 additions & 7 deletions crates/fdev/src/testing/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ async fn config_handler(
}

async fn ws_handler(ws: WebSocketUpgrade, supervisor: Arc<Supervisor>) -> axum::response::Response {
tracing::info!("WebSocket connection received");
let on_upgrade = move |ws: WebSocket| async move {
let cloned_supervisor = supervisor.clone();
if let Err(error) = handle_socket(ws, cloned_supervisor).await {
Expand Down Expand Up @@ -370,8 +371,8 @@ async fn handle_outgoing_messages(
supervisor: &Arc<Supervisor>,
sender: &mut SplitSink<WebSocket, Message>,
) -> anyhow::Result<()> {
let mut event_rx = supervisor.event_rx.lock().await;
while let Some((event, peer_id)) = event_rx.recv().await {
let mut event_rx = supervisor.user_ev_controller.lock().await.subscribe();
while let Ok((event, peer_id)) = event_rx.recv().await {
tracing::info!("Sending event {} to peer {}", event, peer_id);
let serialized_msg: Vec<u8> = bincode::serialize(&(event, peer_id.clone()))
.map_err(|e| anyhow!("Failed to serialize message: {}", e))?;
Expand Down Expand Up @@ -462,23 +463,21 @@ pub struct Supervisor {
processes: Mutex<HashMap<TransportPublicKey, SubProcess>>,
waiting_peers: Arc<Mutex<VecDeque<usize>>>,
waiting_gateways: Arc<Mutex<VecDeque<usize>>>,
user_ev_controller: Arc<Mutex<tokio::sync::mpsc::Sender<(u32, TransportPublicKey)>>>,
event_rx: Arc<Mutex<tokio::sync::mpsc::Receiver<(u32, TransportPublicKey)>>>,
user_ev_controller: Mutex<tokio::sync::broadcast::Sender<(u32, TransportPublicKey)>>,
}

impl Supervisor {
pub async fn new(network: &mut SimNetwork) -> Self {
let peers = network.build_peers();
let peers_config = Arc::new(Mutex::new(peers.into_iter().collect::<HashMap<_, _>>()));
let (user_ev_controller, event_rx) = tokio::sync::mpsc::channel(1);
let (user_ev_controller, _) = tokio::sync::broadcast::channel(1);

Supervisor {
peers_config,
processes: Mutex::new(HashMap::new()),
waiting_peers: Arc::new(Mutex::new(VecDeque::new())),
waiting_gateways: Arc::new(Mutex::new(VecDeque::new())),
user_ev_controller: Arc::new(Mutex::new(user_ev_controller)),
event_rx: Arc::new(Mutex::new(event_rx)),
user_ev_controller: Mutex::new(user_ev_controller),
}
}
async fn start_process(
Expand Down
Loading