diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index 0367ef80a..17c45a1fc 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -14,7 +14,6 @@ use std::{ use directories::ProjectDirs; use libp2p::{identity, PeerId}; use once_cell::sync::Lazy; -use parking_lot::Mutex; use tokio::runtime::Runtime; use crate::local_node::OperationMode; @@ -103,7 +102,7 @@ pub struct ConfigPaths { delegates_dir: PathBuf, secrets_dir: PathBuf, db_dir: PathBuf, - event_log: Mutex, + event_log: PathBuf, } impl ConfigPaths { @@ -118,8 +117,8 @@ impl ConfigPaths { Ok(app_data_dir) } - fn new() -> std::io::Result { - let app_data_dir = Self::app_data_dir()?; + fn new(data_dir: Option) -> std::io::Result { + let app_data_dir = data_dir.map(Ok).unwrap_or_else(Self::app_data_dir)?; let contracts_dir = app_data_dir.join("contracts"); let delegates_dir = app_data_dir.join("delegates"); let secrets_dir = app_data_dir.join("secrets"); @@ -158,7 +157,7 @@ impl ConfigPaths { delegates_dir, secrets_dir, db_dir, - event_log: Mutex::new(event_log), + event_log, }) } } @@ -205,24 +204,14 @@ impl Config { pub fn event_log(&self) -> PathBuf { if self.local_mode.load(std::sync::atomic::Ordering::SeqCst) { - let mut local_file = self.config_paths.event_log.lock().clone(); + let mut local_file = self.config_paths.event_log.clone(); local_file.set_file_name("_EVENT_LOG_LOCAL"); local_file } else { - self.config_paths.event_log.lock().to_owned() + self.config_paths.event_log.to_owned() } } - pub fn set_event_log(path: PathBuf) { - tracing::debug!("setting event log file to: {:?}", &path); - fs::OpenOptions::new() - .write(true) - .create(true) - .open(&path) - .expect("couln't create event log file"); - *Self::conf().config_paths.event_log.lock() = path; - } - pub fn conf() -> &'static Config { CONFIG.get_or_init(|| match Config::load_conf() { Ok(config) => config, @@ -264,7 +253,9 @@ impl Config { .flatten() .unwrap_or(tracing::log::LevelFilter::Info); let (bootstrap_ip, bootstrap_port, bootstrap_id) = Config::get_bootstrap_host(&settings)?; - let config_paths = ConfigPaths::new()?; + + let data_dir = settings.get_string("data_dir").ok().map(PathBuf::from); + let config_paths = ConfigPaths::new(data_dir)?; let local_mode = settings.get_string("network_mode").is_err(); diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index cddf058b6..190fd9b8d 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::fmt::Display; +use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -475,6 +476,10 @@ impl Executor { }) } + pub fn test_data_dir(identifier: &str) -> PathBuf { + std::env::temp_dir().join(format!("freenet-executor-{identifier}")) + } + async fn get_stores( config: &PeerCliConfig, ) -> Result< diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index 2d75c3f5a..8ba725286 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -9,7 +9,7 @@ impl Executor { identifier: &str, event_loop_channel: ExecutorToEventLoopChannel, ) -> Result { - let data_dir = std::env::temp_dir().join(format!("freenet-executor-{identifier}")); + let data_dir = Self::test_data_dir(identifier); let contracts_data_dir = data_dir.join("contracts"); std::fs::create_dir_all(&contracts_data_dir).expect("directory created"); @@ -17,8 +17,6 @@ impl Executor { let db_path = data_dir.join("db"); std::fs::create_dir_all(&db_path).expect("directory created"); - let log_file = data_dir.join("_EVENT_LOG_LOCAL"); - crate::config::Config::set_event_log(log_file); let state_store = StateStore::new(Storage::new(Some(&db_path)).await?, u16::MAX as u32).unwrap(); diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index aa78fc410..2ea13ede1 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -116,7 +116,7 @@ impl Transaction { impl<'a> arbitrary::Arbitrary<'a> for Transaction { fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result { let ty: TransactionTypeId = u.arbitrary()?; - let bytes: u128 = u.arbitrary()?; + let bytes: u128 = Ulid::new().0; Ok(Self::update(ty.0, Ulid(bytes))) } } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index a57402a08..f715403bc 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -193,13 +193,15 @@ impl NodeConfig { { use super::tracing::{CombinedRegister, OTEventRegister}; CombinedRegister::new([ - Box::new(EventRegister::new()), + Box::new(EventRegister::new( + crate::config::Config::conf().event_log(), + )), Box::new(OTEventRegister::new()), ]) } #[cfg(not(feature = "trace-ot"))] { - EventRegister::new() + EventRegister::new(crate::config::Config::conf().event_log()) } }; let node = NodeP2P::build::( @@ -580,8 +582,10 @@ async fn report_result( second_trace_lines.join("\n") }) .unwrap_or_default(); - let log = - format!("Transaction ({tx}) error trace:\n {trace} \nstate:\n {state:?}\n"); + let peer = &op_manager.ring.peer_key; + let log = format!( + "Transaction ({tx} @ {peer}) error trace:\n {trace} \nstate:\n {state:?}\n" + ); std::io::stderr().write_all(log.as_bytes()).unwrap(); } #[cfg(not(any(debug_assertions, test)))] diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index f65ae40ef..e7364f17e 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -816,7 +816,7 @@ impl SimNetwork { /// meaning that: /// /// - at least 50% of the peers have more than the minimum connections - /// - + /// - the average number of connections per peer is above the mean between max and min connections pub fn network_connectivity_quality(&self) -> Result<(), anyhow::Error> { const HIGHER_THAN_MIN_THRESHOLD: f64 = 0.5; let num_nodes = self.number_of_nodes; diff --git a/crates/core/src/node/testing_impl/inter_process.rs b/crates/core/src/node/testing_impl/inter_process.rs index efc4e5b1e..c682e93b9 100644 --- a/crates/core/src/node/testing_impl/inter_process.rs +++ b/crates/core/src/node/testing_impl/inter_process.rs @@ -30,13 +30,15 @@ impl SimPeer { { use crate::tracing::{CombinedRegister, OTEventRegister}; CombinedRegister::new([ - Box::new(EventRegister::new()), + Box::new(EventRegister::new( + crate::config::Config::conf().event_log(), + )), Box::new(OTEventRegister::new()), ]) } #[cfg(not(feature = "trace-ot"))] { - EventRegister::new() + EventRegister::new(crate::config::Config::conf().event_log()) } }; self.run_node(event_generator, event_register).await diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index b4cec3401..66bb9ee37 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -2,6 +2,7 @@ use std::backtrace::Backtrace as StdTrace; use std::{pin::Pin, time::Duration}; +use freenet_stdlib::prelude::ContractKey; use futures::{future::BoxFuture, Future}; use tokio::sync::mpsc::error::SendError; @@ -300,11 +301,7 @@ impl From> for OpError { } /// If the contract is not found, it will try to get it first if the `try_get` parameter is set. -async fn start_subscription_request( - op_manager: &OpManager, - key: freenet_stdlib::prelude::ContractKey, - try_get: bool, -) { +async fn start_subscription_request(op_manager: &OpManager, key: ContractKey, try_get: bool) { let sub_op = subscribe::start_op(key.clone()); if let Err(error) = subscribe::request_subscribe(op_manager, sub_op).await { if !try_get { @@ -322,3 +319,23 @@ async fn start_subscription_request( } } } + +async fn has_contract(op_manager: &OpManager, key: ContractKey) -> Result { + match op_manager + .notify_contract_handler(crate::contract::ContractHandlerEvent::GetQuery { + key, + fetch_contract: false, + }) + .await? + { + crate::contract::ContractHandlerEvent::GetResponse { + response: Ok(crate::contract::StoreResponse { state: Some(_), .. }), + .. + } => Ok(true), + crate::contract::ContractHandlerEvent::GetResponse { + response: Ok(crate::contract::StoreResponse { state: None, .. }), + .. + } => Ok(false), + _ => Err(OpError::UnexpectedOpState), + } +} diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index d6b6931c5..9f3ed2e87 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -1221,11 +1221,27 @@ mod test { /// Given a network of one node and one gateway test that both are connected. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn one_node_connects_to_gw() { - let mut sim_nodes = SimNetwork::new("join_one_node_connects_to_gw", 1, 1, 1, 1, 2, 2).await; - sim_nodes.start().await; - tokio::time::sleep(Duration::from_millis(100)).await; - assert!(sim_nodes.connected(&"node-0".into())); + async fn one_node_connects_to_gw() -> Result<(), anyhow::Error> { + const NUM_NODES: usize = 1usize; + const NUM_GW: usize = 1usize; + const MAX_HTL: usize = 1usize; + const RAND_IF_HTL_ABOVE: usize = 1usize; + const MAX_CONNS: usize = 1usize; + const MIN_CONNS: usize = 1usize; + let mut sim_nw = SimNetwork::new( + "join_one_node_connects_to_gw", + NUM_NODES, + NUM_GW, + MAX_HTL, + RAND_IF_HTL_ABOVE, + MAX_CONNS, + MIN_CONNS, + ) + .await; + sim_nw.start().await; + sim_nw.check_connectivity(Duration::from_secs(1))?; + assert!(sim_nw.connected(&"node-1".into())); + Ok(()) } /// Once a gateway is left without remaining open slots, ensure forwarding connects @@ -1233,14 +1249,18 @@ mod test { async fn forward_connection_to_node() -> Result<(), anyhow::Error> { const NUM_NODES: usize = 3usize; const NUM_GW: usize = 1usize; + const MAX_HTL: usize = 2usize; + const RAND_IF_HTL_ABOVE: usize = 1usize; + const MAX_CONNS: usize = 2usize; + const MIN_CONNS: usize = 1usize; let mut sim_nw = SimNetwork::new( "join_forward_connection_to_node", NUM_GW, NUM_NODES, - 2, - 1, - 2, - 1, + MAX_HTL, + RAND_IF_HTL_ABOVE, + MAX_CONNS, + MIN_CONNS, ) .await; // sim_nw.with_start_backoff(Duration::from_millis(100)); @@ -1264,17 +1284,20 @@ mod test { // crate::config::set_logger(); const NUM_NODES: usize = 10usize; const NUM_GW: usize = 2usize; + const MAX_HTL: usize = 5usize; + const RAND_IF_HTL_ABOVE: usize = 3usize; + const MAX_CONNS: usize = 4usize; + const MIN_CONNS: usize = 2usize; let mut sim_nw = SimNetwork::new( "join_all_nodes_should_connect", NUM_GW, NUM_NODES, - 5, - 3, - 6, - 2, + MAX_HTL, + RAND_IF_HTL_ABOVE, + MAX_CONNS, + MIN_CONNS, ) .await; - sim_nw.with_start_backoff(Duration::from_millis(100)); sim_nw.start().await; sim_nw.check_connectivity(Duration::from_secs(10))?; // wait for a bit so peers can acquire more connections diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 7d7503ec2..27280277b 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -81,6 +81,7 @@ pub(crate) async fn request_get(op_manager: &OpManager, get_op: GetOp) -> Result retries: 0, fetch_contract, requester: None, + current_hop: op_manager.ring.max_hops_to_live, }); let msg = GetMsg::RequestGet { @@ -109,6 +110,7 @@ pub(crate) async fn request_get(op_manager: &OpManager, get_op: GetOp) -> Result Ok(()) } +#[derive(Debug)] enum GetState { /// A new petition for a get op. ReceivedRequest, @@ -124,6 +126,7 @@ enum GetState { requester: Option, fetch_contract: bool, retries: usize, + current_hop: usize, }, } @@ -465,7 +468,7 @@ impl Operation for GetOp { fetch_contract, retries, requester, - .. + current_hop, }) => { // todo: register in the stats for the outcome of the op that failed to get a response from this peer if retries < MAX_RETRIES { @@ -484,7 +487,7 @@ impl Operation for GetOp { target, sender: *this_peer, fetch_contract, - htl: op_manager.ring.max_hops_to_live, + htl: current_hop, skip_list: new_skip_list.clone(), }); } else { @@ -494,6 +497,7 @@ impl Operation for GetOp { retries: retries + 1, fetch_contract, requester, + current_hop, }); } else { tracing::error!( @@ -706,7 +710,13 @@ impl Operation for GetOp { skip_list: skip_list.clone(), }); } - _ => return Err(OpError::invalid_transition(self.id)), + Some(other) => { + return Err(OpError::invalid_transition_with_state( + self.id, + Box::new(other), + )) + } + None => return Err(OpError::invalid_transition(self.id)), }; } } @@ -784,7 +794,7 @@ async fn try_forward_or_return( let Some(new_target) = op_manager .ring - .closest_potentially_caching(&key, [&sender.peer].as_slice()) + .closest_potentially_caching(&key, new_skip_list.as_slice()) else { tracing::warn!( tx = %id, @@ -806,6 +816,7 @@ async fn try_forward_or_return( requester: Some(sender), retries: 0, fetch_contract, + current_hop: new_htl, }), Some(GetMsg::SeekNode { id, diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 05699b035..c6f172f79 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -35,6 +35,7 @@ enum SubscribeState { skip_list: Vec, retries: usize, upstream_subscriber: Option, + current_hop: usize, }, Completed {}, } @@ -65,26 +66,10 @@ pub(crate) async fn request_subscribe( sub_op: SubscribeOp, ) -> Result<(), OpError> { let (target, _id) = if let Some(SubscribeState::PrepareRequest { id, key }) = &sub_op.state { - match op_manager - .notify_contract_handler(crate::contract::ContractHandlerEvent::GetQuery { - key: key.clone(), - fetch_contract: false, - }) - .await? - { - crate::contract::ContractHandlerEvent::GetResponse { - response: Ok(crate::contract::StoreResponse { state: Some(_), .. }), - .. - } => {} - crate::contract::ContractHandlerEvent::GetResponse { - key, - response: Ok(crate::contract::StoreResponse { state: None, .. }), - } => { - return Err(OpError::ContractError(ContractError::ContractNotFound( - key.clone(), - ))); - } - _ => return Err(OpError::UnexpectedOpState), + if !super::has_contract(op_manager, key.clone()).await? { + return Err(OpError::ContractError(ContractError::ContractNotFound( + key.clone(), + ))); } const EMPTY: &[PeerId] = &[]; ( @@ -105,6 +90,7 @@ pub(crate) async fn request_subscribe( let new_state = Some(SubscribeState::AwaitingResponse { skip_list: vec![], retries: 0, + current_hop: op_manager.ring.max_hops_to_live, upstream_subscriber: None, }); let msg = SubscribeMsg::RequestSub { id, key, target }; @@ -236,7 +222,7 @@ impl Operation for SubscribeOp { retries, } => { let this_peer = op_manager.ring.own_location(); - let return_err = || -> OperationResult { + let return_not_subbed = || -> OperationResult { OperationResult { return_msg: Some(NetMessage::from(SubscribeMsg::ReturnSub { key: key.clone(), @@ -249,32 +235,34 @@ impl Operation for SubscribeOp { } }; - if !op_manager.ring.is_subscribed_to_contract(key) { - tracing::debug!(tx = %id, "Contract {} not found at {}, trying other peer", key, target.peer); + if !super::has_contract(op_manager, key.clone()).await? { + tracing::debug!(tx = %id, %key, "Contract not found, trying other peer"); let Some(new_target) = op_manager .ring .closest_potentially_caching(key, skip_list.as_slice()) else { - tracing::warn!(tx = %id, "No peer found while trying getting contract {key}"); - return Err(OpError::RingError(RingError::NoCachingPeers(key.clone()))); + tracing::warn!(tx = %id, %key, "No target peer found while trying getting contract"); + return Ok(return_not_subbed()); }; let new_htl = htl - 1; if new_htl == 0 { - return Ok(return_err()); + tracing::debug!(tx = %id, %key, "Max number of hops reached while trying to get contract"); + return Ok(return_not_subbed()); } let mut new_skip_list = skip_list.clone(); new_skip_list.push(target.peer); - tracing::debug!(tx = %id, "Forward request to peer: {}", new_target.peer); + tracing::debug!(tx = %id, new_target = %new_target.peer, "Forward request to peer"); // Retry seek node when the contract to subscribe has not been found in this node return build_op_result( *id, Some(SubscribeState::AwaitingResponse { skip_list: new_skip_list.clone(), retries: *retries, + current_hop: new_htl, upstream_subscriber: Some(*subscriber), }), (SubscribeMsg::SeekNode { @@ -291,8 +279,9 @@ impl Operation for SubscribeOp { } if op_manager.ring.add_subscriber(key, *subscriber).is_err() { + tracing::debug!(tx = %id, %key, "Max number of subscribers reached for contract"); // max number of subscribers for this contract reached - return Ok(return_err()); + return Ok(return_not_subbed()); } match self.state { @@ -334,6 +323,7 @@ impl Operation for SubscribeOp { mut skip_list, retries, upstream_subscriber, + current_hop, }) => { if retries < MAX_RETRIES { skip_list.push(sender.peer); @@ -350,7 +340,7 @@ impl Operation for SubscribeOp { subscriber, target, skip_list: skip_list.clone(), - htl: op_manager.ring.max_hops_to_live, + htl: current_hop, retries: retries + 1, }); } else { @@ -360,6 +350,7 @@ impl Operation for SubscribeOp { skip_list, retries: retries + 1, upstream_subscriber, + current_hop, }); } else { return Err(OpError::MaxRetriesExceeded( diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index 50e976f5d..b169da080 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -32,7 +32,7 @@ use tracing::Instrument; use crate::message::TransactionType; use crate::topology::{AcquisitionStrategy, TopologyManager}; -use crate::tracing::{EventRegister, NetEventLog, NetEventRegister}; +use crate::tracing::{NetEventLog, NetEventRegister}; use crate::util::Contains; use crate::{ config::GlobalExecutor, @@ -217,7 +217,7 @@ impl Ring { /// Max hops to be performed for certain operations (e.g. propagating connection of a peer in the network). const DEFAULT_MAX_HOPS_TO_LIVE: usize = 10; - pub fn new( + pub fn new( config: &NodeConfig, event_loop_notifier: EventLoopNotificationsSender, event_register: ER, @@ -255,7 +255,7 @@ impl Ring { }; let router = Arc::new(RwLock::new(Router::new(&[]))); - GlobalExecutor::spawn(Self::refresh_router::(router.clone())); + GlobalExecutor::spawn(Self::refresh_router(router.clone(), event_register.clone())); // Just initialize with a fake location, this will be later updated when the peer has an actual location assigned. let topology_manager = RwLock::new(TopologyManager::new(Location::new(0.0))); @@ -307,32 +307,23 @@ impl Ring { .load(std::sync::atomic::Ordering::Acquire) } - async fn refresh_router(router: Arc>) { + async fn refresh_router(router: Arc>, register: ER) { let mut interval = tokio::time::interval(Duration::from_secs(60 * 5)); interval.tick().await; loop { interval.tick().await; - #[cfg(feature = "trace-ot")] - let should_route = std::any::type_name::() - == std::any::type_name::() - || std::any::type_name::() - == std::any::type_name::>(); - #[cfg(not(feature = "trace-ot"))] - let should_route = - std::any::type_name::() == std::any::type_name::(); - let history = if should_route { - EventRegister::get_router_events(10_000) - .await - .map_err(|error| { - tracing::error!("shutting down refresh router task"); - error - }) - .expect("todo: propagate this to main thread") - } else { - vec![] - }; - let router_ref = &mut *router.write(); - *router_ref = Router::new(&history); + let history = register + .get_router_events(10_000) + .await + .map_err(|error| { + tracing::error!("shutting down refresh router task"); + error + }) + .expect("todo: propagate this to main thread"); + if !history.is_empty() { + let router_ref = &mut *router.write(); + *router_ref = Router::new(&history); + } } } diff --git a/crates/core/src/tracing.rs b/crates/core/src/tracing.rs index 675293545..84e3cb558 100644 --- a/crates/core/src/tracing.rs +++ b/crates/core/src/tracing.rs @@ -1,4 +1,9 @@ -use std::{io, path::Path, time::SystemTime}; +use std::{ + io, + path::{Path, PathBuf}, + sync::Arc, + time::SystemTime, +}; use chrono::{DateTime, Utc}; use either::Either; @@ -43,6 +48,7 @@ pub(crate) trait NetEventRegister: std::any::Any + Send + Sync + 'static { ) -> BoxFuture<'a, ()>; fn notify_of_time_out(&mut self, tx: Transaction) -> BoxFuture<()>; fn trait_clone(&self) -> Box; + fn get_router_events(&self, number: usize) -> BoxFuture, DynError>>; } #[cfg(feature = "trace-ot")] @@ -81,6 +87,19 @@ impl NetEventRegister for CombinedRegister { } .boxed() } + + fn get_router_events(&self, number: usize) -> BoxFuture, DynError>> { + async move { + for reg in &self.0 { + let events = reg.get_router_events(number).await?; + if !events.is_empty() { + return Ok(events); + } + } + Ok(vec![]) + } + .boxed() + } } #[cfg(feature = "trace-ot")] @@ -369,6 +388,7 @@ impl<'a> From<&'a NetLogMessage> for Option> { #[derive(Clone)] pub(crate) struct EventRegister { + log_file: Arc, log_sender: mpsc::Sender, } @@ -388,22 +408,28 @@ impl EventRegister { const BATCH_SIZE: usize = EVENT_REGISTER_BATCH_SIZE; - pub fn new() -> Self { + pub fn new(event_log_path: PathBuf) -> Self { let (log_sender, log_recv) = mpsc::channel(1000); NEW_RECORDS_TS.get_or_init(SystemTime::now); - GlobalExecutor::spawn(Self::record_logs(log_recv)); - Self { log_sender } + let log_file = Arc::new(event_log_path); + GlobalExecutor::spawn(Self::record_logs(log_recv, log_file.clone())); + Self { + log_sender, + log_file, + } } - async fn record_logs(mut log_recv: mpsc::Receiver) { + async fn record_logs( + mut log_recv: mpsc::Receiver, + event_log_path: Arc, + ) { use futures::StreamExt; tokio::time::sleep(std::time::Duration::from_millis(200)).await; // wait for the node to start - let event_log_path = crate::config::Config::conf().event_log(); let mut event_log = match OpenOptions::new() .write(true) .read(true) - .open(&event_log_path) + .open(&*event_log_path) .await { Ok(file) => file, @@ -609,26 +635,6 @@ impl EventRegister { } } - #[cfg(test)] - { - assert!(!buffer.is_empty()); - let mut unique = std::collections::HashSet::new(); - let mut read_buf = &*buffer; - let mut length_bytes: [u8; 4] = [0u8; 4]; - let mut cursor = 0; - while read_buf.read_exact(&mut length_bytes).await.is_ok() { - let length = u32::from_be_bytes(length_bytes) as usize; - cursor += 4; - let log: NetLogMessage = - bincode::deserialize(&buffer[cursor..cursor + length]).unwrap(); - cursor += length; - read_buf = &buffer[cursor..]; - unique.insert(log.peer_id); - // tracing::debug!(?log, %cursor); - } - assert!(unique.len() > 1); - } - // Seek back to the beginning and write the remaining content file.rewind().await?; file.write_all(&buffer).await?; @@ -639,12 +645,14 @@ impl EventRegister { Ok(()) } - pub async fn get_router_events(max_event_number: usize) -> Result, DynError> { + pub async fn get_router_events( + max_event_number: usize, + event_log_path: &Path, + ) -> Result, DynError> { use tokio::io::{AsyncReadExt, AsyncSeekExt}; const MAX_EVENT_HISTORY: usize = 10_000; let event_num = max_event_number.min(MAX_EVENT_HISTORY); - let event_log_path = crate::config::Config::conf().event_log(); // tracing::info!(?event_log_path); let _guard: tokio::sync::MutexGuard<'_, ()> = FILE_LOCK.lock().await; let mut file = @@ -723,6 +731,10 @@ impl NetEventRegister for EventRegister { fn notify_of_time_out(&mut self, _: Transaction) -> BoxFuture<()> { async {}.boxed() } + + fn get_router_events(&self, number: usize) -> BoxFuture, DynError>> { + async move { EventRegister::get_router_events(number, &self.log_file).await }.boxed() + } } async fn connect_to_metrics_server() -> Option>> { @@ -1058,6 +1070,13 @@ mod opentelemetry_tracer { } .boxed() } + + fn get_router_events( + &self, + _number: usize, + ) -> BoxFuture, DynError>> { + async { Ok(vec![]) }.boxed() + } } } @@ -1226,13 +1245,13 @@ pub(super) mod test { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn event_register_read_write() -> Result<(), DynError> { use std::time::Duration; - let event_log_path = crate::config::Config::conf().event_log(); - // truncate the log if it exists - std::fs::File::create(event_log_path).unwrap(); + let temp_dir = tempfile::tempdir()?; + let log_path = temp_dir.path().join("event_log"); + std::fs::File::create(&log_path)?; // force a truncation const TEST_LOGS: usize = EventRegister::MAX_LOG_RECORDS + 100; - let register = EventRegister::new(); + let register = EventRegister::new(log_path.clone()); let bytes = crate::util::test::random_bytes_2mb(); let mut gen = arbitrary::Unstructured::new(&bytes); let mut transactions = vec![]; @@ -1244,11 +1263,15 @@ pub(super) mod test { let peer: PeerId = gen.arbitrary()?; peers.push(peer); } - for _ in 0..TEST_LOGS { + let mut total_route_events = 0; + for i in 0..TEST_LOGS { let kind: EventKind = gen.arbitrary()?; + if matches!(kind, EventKind::Route(_)) { + total_route_events += 1; + } events.push(NetEventLog { - tx: transactions.last().unwrap(), - peer_id: peers.last().unwrap(), + tx: &transactions[i], + peer_id: &peers[i], kind, }); } @@ -1256,9 +1279,10 @@ pub(super) mod test { while register.log_sender.capacity() != 1000 { tokio::time::sleep(Duration::from_millis(500)).await; } - tokio::time::sleep(Duration::from_millis(3_000)).await; - let ev = EventRegister::get_router_events(100).await?; - assert!(!ev.is_empty()); + tokio::time::sleep(Duration::from_millis(1_000)).await; + let ev = + EventRegister::get_router_events(EventRegister::MAX_LOG_RECORDS, &log_path).await?; + assert_eq!(ev.len(), total_route_events); Ok(()) } @@ -1479,6 +1503,13 @@ pub(super) mod test { fn notify_of_time_out(&mut self, _: Transaction) -> BoxFuture<()> { async {}.boxed() } + + fn get_router_events( + &self, + _number: usize, + ) -> BoxFuture, DynError>> { + async { Ok(vec![]) }.boxed() + } } #[tokio::test] diff --git a/crates/core/src/util.rs b/crates/core/src/util.rs index b7089bb98..a90cfdb7b 100644 --- a/crates/core/src/util.rs +++ b/crates/core/src/util.rs @@ -237,6 +237,12 @@ impl<'x> Contains for &'x [&PeerId] { } } +impl<'x> Contains for &'x Vec<&PeerId> { + fn has_element(&self, target: &PeerId) -> bool { + self.contains(&target) + } +} + #[cfg(test)] pub mod tests { diff --git a/crates/fdev/src/testing/multiple_process.rs b/crates/fdev/src/testing/multiple_process.rs index ea1d8a9de..d3ca4f9cb 100644 --- a/crates/fdev/src/testing/multiple_process.rs +++ b/crates/fdev/src/testing/multiple_process.rs @@ -6,8 +6,12 @@ use std::{ }; use anyhow::anyhow; -use freenet::dev_tool::{ - EventChain, InterProcessConnManager, MemoryEventsGen, NodeConfig, NodeLabel, PeerId, SimPeer, +use freenet::{ + dev_tool::{ + EventChain, InterProcessConnManager, MemoryEventsGen, NodeConfig, NodeLabel, PeerId, + Runtime, SimPeer, + }, + local_node::Executor, }; use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; use rand::Rng; @@ -69,6 +73,8 @@ pub struct MultiProcessConfig { pub mode: Process, #[arg(long)] id: Option, + #[arg(long)] + data_dir: Option, } #[derive(Default, Clone, clap::ValueEnum)] @@ -93,7 +99,7 @@ pub(super) async fn run( ) -> anyhow::Result<(), Error> { match cmd_config.mode { Process::Supervisor => supervisor(config).await, - Process::Child => child(config, cmd_config.id.expect("id should be set for child")).await, + Process::Child => child(config, cmd_config).await, } } @@ -320,11 +326,15 @@ struct SubProcess { impl SubProcess { fn start(cmd_args: &[String], label: &NodeLabel, id: PeerId) -> anyhow::Result { + // the identifier used for multi-process tests is the peer id + let data_dir = Executor::::test_data_dir(&id.to_string()); let child = Command::new("fdev") .kill_on_drop(true) .args(cmd_args) .arg("--id") .arg(label.number().to_string()) + .arg("--data-dir") + .arg(data_dir.to_str().expect("valid path")) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()) @@ -381,9 +391,18 @@ impl SubProcess { } } -async fn child(test_config: &super::TestConfig, id: usize) -> anyhow::Result<()> { +async fn child( + test_config: &super::TestConfig, + child_config: &MultiProcessConfig, +) -> anyhow::Result<()> { + let id = child_config.id.expect("id should be set for child process"); + let data_dir = child_config + .data_dir + .as_ref() + .expect("data_dir should be set for child process"); // write logs to stderr so stdout and stdin are free of unexpected data std::env::set_var("FREENET_LOG_TO_STDERR", "1"); + std::env::set_var("FREENET_DATA_DIR", data_dir); let (user_ev_controller, mut receiver_ch) = tokio::sync::watch::channel((0, PeerId::random())); receiver_ch.borrow_and_update();