From 488a1cfe3644c7a30c362167c075575a8939d62c Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Wed, 11 Oct 2023 12:50:30 +0200 Subject: [PATCH 1/5] Connect router and feed live events --- Cargo.lock | 12 ++ crates/core/Cargo.toml | 1 + crates/core/src/contract/executor.rs | 10 +- crates/core/src/message.rs | 22 +-- crates/core/src/node.rs | 57 ++++-- .../core/src/node/conn_manager/p2p_protoc.rs | 71 ++++++-- crates/core/src/node/op_state.rs | 5 + crates/core/src/node/p2p_impl.rs | 18 +- crates/core/src/operations.rs | 63 ++++--- crates/core/src/operations/get.rs | 169 ++++++++++++++---- crates/core/src/operations/join_ring.rs | 60 +++++-- crates/core/src/operations/op_trait.rs | 8 +- crates/core/src/operations/put.rs | 141 ++++++++++++--- crates/core/src/operations/subscribe.rs | 64 +++---- crates/core/src/operations/update.rs | 4 +- crates/core/src/ring.rs | 13 +- crates/core/src/router.rs | 33 ++-- crates/core/src/router/isotonic_estimator.rs | 14 +- crates/core/src/router/util.rs | 6 +- 19 files changed, 545 insertions(+), 226 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e5d9ba7c..97e3a6e1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1180,6 +1180,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "delegate" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ee5df75c70b95bd3aacc8e2fd098797692fb1d54121019c4de481e42f04c8a1" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "der" version = "0.7.8" @@ -1567,6 +1578,7 @@ dependencies = [ "crossbeam", "ctrlc", "dashmap", + "delegate", "directories", "either", "freenet-stdlib", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index cbdb2ef0f..519a20d39 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -31,6 +31,7 @@ config = { version = "0.13.0", features = [ "toml" ] } crossbeam = "0.8.2" ctrlc = { version = "3.4", features = ["termination"] } dashmap = "^5.5" +delegate = "0.10" directories = "5" either = { workspace = true , features = ["serde"] } futures = "0.3.21" diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index 01236c011..b6a86baf2 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -31,7 +31,7 @@ use crate::runtime::{ }; use crate::{ client_events::{ClientId, HostResult}, - node::{NodeConfig, P2pBridge}, + node::NodeConfig, operations::{self, op_trait::Operation}, DynError, }; @@ -136,7 +136,7 @@ impl ExecutorToEventLoopChannel { async fn send_to_event_loop(&mut self, message: T) -> Result<(), DynError> where T: ComposeNetworkMessage, - Op: Operation + Send + 'static, + Op: Operation + Send + 'static, { let op = message.initiate_op(&self.op_manager); self.end.sender.send(*op.id()).await?; @@ -178,7 +178,7 @@ mod sealed { trait ComposeNetworkMessage where Self: Sized, - Op: Operation + Send + 'static, + Op: Operation + Send + 'static, { fn initiate_op(self, op_manager: &OpManager) -> Op { todo!() @@ -205,7 +205,7 @@ impl ComposeNetworkMessage for GetContract { op: operations::get::GetOp, op_manager: &OpManager, ) -> Result { - let id = *>::id(&op); + let id = *op.id(); operations::get::request_get(op_manager, op, None).await?; Ok(id) } @@ -321,7 +321,7 @@ impl Executor { // dependencies to be resolved async fn op_request(&mut self, request: M) -> Result where - Op: Operation + Send + 'static, + Op: Operation + Send + 'static, M: ComposeNetworkMessage, { debug_assert!(self.event_loop_channel.is_some()); diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index 35d0acd6a..4f4bd3b7a 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -1,9 +1,6 @@ //! Main message type which encapsulated all the messaging between nodes. -use std::{ - fmt::Display, - time::{Duration, SystemTime}, -}; +use std::{fmt::Display, time::Duration}; use serde::{Deserialize, Serialize}; use uuid::{ @@ -45,13 +42,7 @@ static UUID_CONTEXT: Context = Context::new(14); impl Transaction { pub fn new(ty: TransactionTypeId, initial_peer: &PeerKey) -> Transaction { // using v1 UUID to keep to keep track of the creation ts - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("infallible"); - let now_secs = now.as_secs(); - let now_nanos = now.as_nanos(); - let now_nanos = now_nanos - (now_secs as u128 * 1_000_000_000); - let ts = Timestamp::from_unix(&UUID_CONTEXT, now_secs, now_nanos as u32); + let ts: Timestamp = uuid::timestamp::Timestamp::now(&UUID_CONTEXT); // event in the net this UUID should be unique since peer keys are unique // however some id collision may be theoretically possible if two transactions @@ -61,8 +52,8 @@ impl Transaction { let b = &mut [0; 6]; b.copy_from_slice(&initial_peer.to_bytes()[0..6]); let id = Uuid::new_v1(ts, b); - // 2 word size for 64-bits platforms most likely since msg type - // probably will be aligned to 64 bytes + + // 3 word size for 64-bits platforms Self { id, ty } } @@ -231,6 +222,11 @@ impl Message { Canceled(_) => true, } } + + pub fn track_stats(&self) -> bool { + use Message::*; + !matches!(self, JoinRing(_) | Subscribe(_) | Canceled(_)) + } } impl Display for Message { diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 489f5db70..fc38d6dcf 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -36,14 +36,15 @@ use crate::{ operations::{ get, join_ring::{self, JoinRingMsg, JoinRingOp}, - put, subscribe, OpEnum, OpError, + put, subscribe, OpEnum, OpError, OpOutcome, }, ring::{Location, PeerKeyLocation}, + router::{RouteEvent, RouteOutcome}, util::{ExponentialBackoff, IterExt}, }; use crate::operations::handle_op_request; -pub(crate) use conn_manager::{p2p_protoc::P2pBridge, ConnectionBridge, ConnectionError}; +pub(crate) use conn_manager::{ConnectionBridge, ConnectionError}; pub(crate) use op_state::OpManager; mod conn_manager; @@ -440,19 +441,53 @@ macro_rules! log_handling_msg { }; } -#[inline(always)] async fn report_result( op_result: Result, OpError>, + op_storage: &OpManager, executor_callback: Option>, client_req_handler_callback: Option<(ClientId, ClientResponsesSender)>, ) { match op_result { - Ok(Some(res)) => { + Ok(Some(op_res)) => { if let Some((client_id, cb)) = client_req_handler_callback { - let _ = cb.send((client_id, res.to_host_result(client_id))); + let _ = cb.send((client_id, op_res.to_host_result(client_id))); + } + // check operations.rs:handle_op_result to see what's the meaning of each state + // in case more cases want to be handled when feeding information to the OpManager + + match op_res.outcome() { + OpOutcome::ContractOpSuccess { + target_peer, + contract_location, + first_response_time, + payload_size, + payload_transfer_time, + } => { + op_storage.ring.routing_finished(RouteEvent { + peer: *target_peer, + contract_location, + outcome: RouteOutcome::Success { + time_to_response_start: first_response_time, + payload_size, + payload_transfer_time, + }, + }); + } + // todo: handle failures, need to track timeouts and other potential failures + // OpOutcome::ContractOpFailure { + // target_peer: Some(target_peer), + // contract_location, + // } => { + // op_storage.ring.routing_finished(RouteEvent { + // peer: *target_peer, + // contract_location, + // outcome: RouteOutcome::Failure, + // }); + // } + OpOutcome::Incomplete | OpOutcome::Irrelevant => {} } if let Some(mut cb) = executor_callback { - cb.response(res).await; + cb.response(op_res).await; } } Ok(None) => {} @@ -489,7 +524,7 @@ async fn process_message( client_id, ) .await; - report_result(op_result, executor_callback, cli_req).await; + report_result(op_result, &op_storage, executor_callback, cli_req).await; } Message::Put(op) => { log_handling_msg!("put", *op.id(), op_storage); @@ -500,7 +535,7 @@ async fn process_message( client_id, ) .await; - report_result(op_result, executor_callback, cli_req).await; + report_result(op_result, &op_storage, executor_callback, cli_req).await; } Message::Get(op) => { log_handling_msg!("get", op.id(), op_storage); @@ -511,7 +546,7 @@ async fn process_message( client_id, ) .await; - report_result(op_result, executor_callback, cli_req).await; + report_result(op_result, &op_storage, executor_callback, cli_req).await; } Message::Subscribe(op) => { log_handling_msg!("subscribe", op.id(), op_storage); @@ -522,13 +557,13 @@ async fn process_message( client_id, ) .await; - report_result(op_result, executor_callback, cli_req).await; + report_result(op_result, &op_storage, executor_callback, cli_req).await; } _ => {} } } Err(err) => { - report_result(Err(err.into()), executor_callback, cli_req).await; + report_result(Err(err.into()), &op_storage, executor_callback, cli_req).await; } } } diff --git a/crates/core/src/node/conn_manager/p2p_protoc.rs b/crates/core/src/node/conn_manager/p2p_protoc.rs index 1f2573591..4eee493ad 100644 --- a/crates/core/src/node/conn_manager/p2p_protoc.rs +++ b/crates/core/src/node/conn_manager/p2p_protoc.rs @@ -62,6 +62,7 @@ fn config_behaviour( local_key: &Keypair, gateways: &[InitPeerNode], _public_addr: &Option, + op_manager: Arc, ) -> NetBehaviour { let routing_table: HashMap<_, _> = gateways .iter() @@ -101,6 +102,7 @@ fn config_behaviour( connected: HashMap::new(), openning_connection: HashSet::new(), inbound: VecDeque::new(), + op_manager, }, } } @@ -177,6 +179,7 @@ impl P2pConnManager { pub fn build( transport: transport::Boxed<(PeerId, muxing::StreamMuxerBox)>, config: &NodeBuilder, + op_manager: Arc, ) -> Result { // We set a global executor which is virtually the Tokio multi-threaded executor // to reuse it's thread pool and scheduler in order to drive futures. @@ -191,7 +194,12 @@ impl P2pConnManager { let builder = SwarmBuilder::with_executor( transport, - config_behaviour(&config.local_key, &config.remote_nodes, &public_addr), + config_behaviour( + &config.local_key, + &config.remote_nodes, + &public_addr, + op_manager, + ), PeerId::from(config.local_key.public()), global_executor, ); @@ -505,6 +513,7 @@ pub(in crate::node) struct FreenetBehaviour { routing_table: HashMap>, connected: HashMap, openning_connection: HashSet, + op_manager: Arc, } impl NetworkBehaviour for FreenetBehaviour { @@ -526,7 +535,7 @@ impl NetworkBehaviour for FreenetBehaviour { .entry(peer_id) .or_default() .insert(remote_addr.clone()); - Ok(Handler::new()) + Ok(Handler::new(self.op_manager.clone())) } fn handle_established_outbound_connection( @@ -543,7 +552,7 @@ impl NetworkBehaviour for FreenetBehaviour { .entry(peer_id) .or_default() .insert(addr.clone()); - Ok(Handler::new()) + Ok(Handler::new(self.op_manager.clone())) } fn on_connection_handler_event( @@ -634,6 +643,7 @@ pub(in crate::node) struct Handler { uniq_conn_id: UniqConnId, protocol_status: ProtocolStatus, pending: Vec, + op_manager: Arc, } #[allow(dead_code)] @@ -668,6 +678,7 @@ enum SubstreamState { PendingFlush { conn_id: UniqConnId, substream: FreenetStream, + op_id: Option, }, /// Waiting for an answer back from the remote. WaitingMsg { @@ -685,13 +696,14 @@ impl SubstreamState { } impl Handler { - fn new() -> Self { + fn new(op_manager: Arc) -> Self { Self { substreams: vec![], keep_alive: KeepAlive::Until(Instant::now() + config::PEER_TIMEOUT), uniq_conn_id: 0, protocol_status: ProtocolStatus::Unconfirmed, pending: Vec::new(), + op_manager, } } @@ -831,17 +843,31 @@ impl ConnectionHandler for Handler { } _ => break, }, - Left(msg) => match Sink::start_send(Pin::new(&mut substream), msg) { - Ok(()) => { - stream = SubstreamState::PendingFlush { substream, conn_id }; + Left(msg) => { + let op_id = msg.id(); + if msg.track_stats() { + if let Some(mut op) = self.op_manager.pop(op_id) { + op.record_transfer(); + let _ = self.op_manager.push(*op_id, op); + } } - Err(err) => { - let event = ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::Inbound(Right(NodeEvent::Error(err))), - ); - return Poll::Ready(event); + let op_id = *op_id; + match Sink::start_send(Pin::new(&mut substream), msg) { + Ok(()) => { + stream = SubstreamState::PendingFlush { + substream, + conn_id, + op_id: Some(op_id), + }; + } + Err(err) => { + let event = ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Inbound(Right(NodeEvent::Error(err))), + ); + return Poll::Ready(event); + } } - }, + } }, Poll::Pending => { stream = SubstreamState::PendingSend { @@ -861,14 +887,24 @@ impl ConnectionHandler for Handler { SubstreamState::PendingFlush { mut substream, conn_id, + op_id, } => match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { + if let Some(op_id) = op_id { + if let Some(mut op) = self.op_manager.pop(&op_id) { + op.record_transfer(); + let _ = self.op_manager.push(op_id, op); + } + } stream = SubstreamState::WaitingMsg { substream, conn_id }; continue; } Poll::Pending => { - self.substreams - .push(SubstreamState::PendingFlush { substream, conn_id }); + self.substreams.push(SubstreamState::PendingFlush { + substream, + conn_id, + op_id, + }); break; } Poll::Ready(Err(err)) => { @@ -883,6 +919,11 @@ impl ConnectionHandler for Handler { conn_id, } => match Stream::poll_next(Pin::new(&mut substream), cx) { Poll::Ready(Some(Ok(msg))) => { + let op_id = msg.id(); + if let Some(mut op) = self.op_manager.pop(op_id) { + op.record_transfer(); + let _ = self.op_manager.push(*op_id, op); + } if !msg.terminal() { // received a message, the other peer is waiting for an answer self.substreams diff --git a/crates/core/src/node/op_state.rs b/crates/core/src/node/op_state.rs index c3a5658f1..047bcc1cc 100644 --- a/crates/core/src/node/op_state.rs +++ b/crates/core/src/node/op_state.rs @@ -34,6 +34,7 @@ pub(crate) struct OpManager { pub ring: Ring, } +#[cfg(debug_assertions)] macro_rules! check_id_op { ($get_ty:expr, $var:path) => { if !matches!($get_ty, $var) { @@ -108,18 +109,22 @@ impl OpManager { pub fn push(&self, id: Transaction, op: OpEnum) -> Result<(), OpError> { match op { OpEnum::JoinRing(tx) => { + #[cfg(debug_assertions)] check_id_op!(id.tx_type(), TransactionType::JoinRing); self.join_ring.insert(id, *tx); } OpEnum::Put(tx) => { + #[cfg(debug_assertions)] check_id_op!(id.tx_type(), TransactionType::Put); self.put.insert(id, tx); } OpEnum::Get(tx) => { + #[cfg(debug_assertions)] check_id_op!(id.tx_type(), TransactionType::Get); self.get.insert(id, tx); } OpEnum::Subscribe(tx) => { + #[cfg(debug_assertions)] check_id_op!(id.tx_type(), TransactionType::Subscribe); self.subscribe.insert(id, tx); } diff --git a/crates/core/src/node/p2p_impl.rs b/crates/core/src/node/p2p_impl.rs index 9c2b93a35..4e44ae477 100644 --- a/crates/core/src/node/p2p_impl.rs +++ b/crates/core/src/node/p2p_impl.rs @@ -32,7 +32,7 @@ use super::OpManager; pub(super) struct NodeP2P { pub(crate) peer_key: PeerKey, - pub(crate) op_storage: Arc, + pub(crate) op_manager: Arc, notification_channel: EventLoopNotifications, pub(super) conn_manager: P2pConnManager, // event_listener: Option>, @@ -54,7 +54,7 @@ impl NodeP2P { None, self.peer_key, gateway, - &self.op_storage, + &self.op_manager, &mut self.conn_manager.bridge, ) .await?; @@ -67,7 +67,7 @@ impl NodeP2P { // todo: pass `cli_response_sender` self.conn_manager .run_event_listener( - self.op_storage.clone(), + self.op_manager.clone(), self.notification_channel, self.executor_listener, self.cli_response_sender, @@ -85,11 +85,6 @@ impl NodeP2P { let peer_key = PeerKey::from(builder.local_key.public()); let gateways = builder.get_gateways()?; - let conn_manager = { - let transport = Self::config_transport(&builder.local_key)?; - P2pConnManager::build(transport, &builder)? - }; - let ring = Ring::new(&builder, &gateways)?; let (notification_channel, notification_tx) = EventLoopNotifications::channel(); let (ch_outbound, ch_inbound) = contract::contract_handler_channel(); @@ -100,6 +95,11 @@ impl NodeP2P { .await .map_err(|e| anyhow::anyhow!(e))?; + let conn_manager = { + let transport = Self::config_transport(&builder.local_key)?; + P2pConnManager::build(transport, &builder, op_storage.clone())? + }; + GlobalExecutor::spawn(contract::contract_handling(contract_handler)); let clients = ClientEventsCombinator::new(builder.clients); GlobalExecutor::spawn(client_event_handling( @@ -112,7 +112,7 @@ impl NodeP2P { peer_key, conn_manager, notification_channel, - op_storage, + op_manager: op_storage, is_gateway: builder.location.is_some(), executor_listener, cli_response_sender, diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 9c9576331..53e953442 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use tokio::sync::mpsc::error::SendError; use self::op_trait::Operation; @@ -6,8 +8,7 @@ use crate::{ contract::ContractError, message::{InnerMessage, Message, Transaction, TransactionType}, node::{ConnectionBridge, ConnectionError, OpManager, PeerKey}, - operations::{get::GetOp, join_ring::JoinRingOp, put::PutOp, subscribe::SubscribeOp}, - ring::RingError, + ring::{Location, PeerKeyLocation, RingError}, }; pub(crate) mod get; @@ -36,7 +37,7 @@ pub(crate) async fn handle_op_request( client_id: Option, ) -> Result, OpError> where - Op: Operation, + Op: Operation, CB: ConnectionBridge, { let sender; @@ -92,7 +93,7 @@ where Ok(OperationResult { return_msg: None, state: Some(final_state), - }) if final_state.is_final() => { + }) if final_state.finalized() => { // operation finished_completely with result return Ok(Some(final_state)); } @@ -101,7 +102,7 @@ where state: Some(updated_state), }) => { // interim state - let id = OpEnum::id::(&updated_state); + let id = *updated_state.id(); op_storage.push(id, updated_state)?; } Ok(OperationResult { @@ -131,23 +132,17 @@ pub(crate) enum OpEnum { } impl OpEnum { - fn id(&self) -> Transaction { - use OpEnum::*; - match self { - JoinRing(op) => *>::id(op), - Put(op) => *>::id(op), - Get(op) => *>::id(op), - Subscribe(op) => *>::id(op), - } - } - - fn is_final(&self) -> bool { - match self { - OpEnum::JoinRing(op) if op.finished() => true, - OpEnum::Put(op) if op.finished() => true, - OpEnum::Get(op) if op.finished() => true, - OpEnum::Subscribe(op) if op.finished() => true, - _ => false, + delegate::delegate! { + to match self { + OpEnum::JoinRing(op) => op, + OpEnum::Put(op) => op, + OpEnum::Get(op) => op, + OpEnum::Subscribe(op) => op, + } { + pub fn id(&self) -> &Transaction; + pub fn outcome(&self) -> OpOutcome; + pub fn finalized(&self) -> bool; + pub fn record_transfer(&mut self); } } @@ -156,6 +151,30 @@ impl OpEnum { } } +pub(crate) enum OpOutcome<'a> { + /// An op which involves a contract completed successfully. + ContractOpSuccess { + target_peer: &'a PeerKeyLocation, + contract_location: Location, + /// Time the operation took to initiate. + first_response_time: Duration, + /// Size of the payload (contract, state, etc.) in bytes. + payload_size: usize, + /// Transfer time of the payload. + payload_transfer_time: Duration, + }, + // todo: handle failures stats when it does not complete successfully + // /// An op which involves a contract completed unsuccessfully. + // ContractOpFailure { + // target_peer: Option<&'a PeerKeyLocation>, + // contract_location: Location, + // }, + /// In transit contract operation. + Incomplete, + /// This operation stats are not relevant for this peer. + Irrelevant, +} + #[derive(Debug, thiserror::Error)] pub(crate) enum OpError { #[error(transparent)] diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 8f5c3fc0c..e8227e5d0 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -1,6 +1,6 @@ -use std::future::Future; use std::pin::Pin; use std::time::Duration; +use std::{future::Future, time::Instant}; use freenet_stdlib::prelude::*; @@ -15,7 +15,7 @@ use crate::{ DynError, }; -use super::{OpEnum, OpError, OperationResult}; +use super::{OpEnum, OpError, OpOutcome, OperationResult}; pub(crate) use self::messages::GetMsg; @@ -30,15 +30,97 @@ pub(crate) struct GetOp { id: Transaction, state: Option, result: Option, + stats: Option, _ttl: Duration, } + +struct GetStats { + caching_peer: Option, + contract_location: Location, + /// (start, end) + first_response_time: Option<(Instant, Option)>, + /// (start, end) + transfer_time: Option<(Instant, Option)>, + step: RecordingStats, +} + +/// While timing, at what particular step we are now. +#[derive(Clone, Copy, Default)] +enum RecordingStats { + #[default] + Uninitialized, + InitGet, + TransferNotStarted, + TransferStarted, + Completed, +} + impl GetOp { - pub(super) fn finished(&self) -> bool { - self.result.is_some() + pub(super) fn outcome(&self) -> OpOutcome { + if let Some(( + GetResult { state, contract }, + GetStats { + caching_peer: Some(target_peer), + contract_location, + first_response_time: Some((response_start, Some(response_end))), + transfer_time: Some((transfer_start, Some(transfer_end))), + .. + }, + )) = self.result.as_ref().zip(self.stats.as_ref()) + { + let payload_size = state.size() + + contract + .as_ref() + .map(|c| c.data().len()) + .unwrap_or_default(); + OpOutcome::ContractOpSuccess { + target_peer, + contract_location: *contract_location, + payload_size, + first_response_time: *response_end - *response_start, + payload_transfer_time: *transfer_end - *transfer_start, + } + } else { + OpOutcome::Incomplete + } + } + + pub(super) fn finalized(&self) -> bool { + self.stats + .as_ref() + .map(|s| s.transfer_time.is_some()) + .unwrap_or(false) + } + + pub(super) fn record_transfer(&mut self) { + if let Some(stats) = self.stats.as_mut() { + match stats.step { + RecordingStats::Uninitialized => { + stats.first_response_time = Some((Instant::now(), None)); + stats.step = RecordingStats::InitGet; + } + RecordingStats::InitGet => { + if let Some((_, e)) = stats.first_response_time.as_mut() { + *e = Some(Instant::now()); + } + stats.step = RecordingStats::TransferNotStarted; + } + RecordingStats::TransferNotStarted => { + stats.transfer_time = Some((Instant::now(), None)); + stats.step = RecordingStats::TransferStarted; + } + RecordingStats::TransferStarted => { + if let Some((_, e)) = stats.transfer_time.as_mut() { + *e = Some(Instant::now()); + } + stats.step = RecordingStats::Completed; + } + RecordingStats::Completed => {} + } + } } } -#[allow(dead_code)] pub(crate) struct GetResult { pub state: WrappedState, pub contract: Option, @@ -55,10 +137,7 @@ impl TryFrom for GetResult { } } -impl Operation for GetOp -where - CB: std::marker::Send, -{ +impl Operation for GetOp { type Message = GetMsg; type Result = GetResult; @@ -74,7 +153,7 @@ where let result = match op_storage.pop(msg.id()) { Some(OpEnum::Get(get_op)) => { Ok(OpInitialization { op: get_op, sender }) - // was an existing operation, the other peer messaged back + // was an existing operation, other peer messaged back } Some(_) => return Err(OpError::OpNotPresent(tx)), None => { @@ -84,6 +163,7 @@ where state: Some(GetState::ReceivedRequest), id: tx, result: None, + stats: None, // don't care about stats in target peers _ttl: PEER_TIMEOUT, }, sender, @@ -98,7 +178,7 @@ where &self.id } - fn process_message<'a>( + fn process_message<'a, CB: ConnectionBridge>( self, conn_manager: &'a mut CB, op_storage: &'a OpManager, @@ -109,6 +189,7 @@ where let return_msg; let new_state; let mut result = None; + let mut stats = self.stats; match input { GetMsg::RequestGet { @@ -124,6 +205,13 @@ where )); tracing::debug!("Seek contract {} @ {} (tx: {})", key, target.peer, id); new_state = self.state; + stats = Some(GetStats { + contract_location: Location::from(&key), + caching_peer: None, + transfer_time: None, + first_response_time: None, + step: Default::default(), + }); return_msg = Some(GetMsg::SeekNode { key, id, @@ -142,6 +230,10 @@ where htl, } => { let is_cached_contract = op_storage.ring.is_contract_cached(&key); + if let Some(s) = stats.as_mut() { + s.caching_peer = Some(target); + } + if !is_cached_contract { tracing::warn!( "Contract `{}` not found while processing a get request at node @ {}", @@ -170,6 +262,7 @@ where target: sender, // return to requester }), None, + stats, self._ttl, ); } @@ -274,6 +367,7 @@ where fetch_contract, .. }) => { + // todo: register in the stats for the outcome of the op that failed to get a response from this peer if retries < MAX_RETRIES { // no response received from this peer, so skip it in the next iteration skip_list.push(target.peer); @@ -367,6 +461,7 @@ where state: self.state, result: None, _ttl: self._ttl, + stats, }; op_storage @@ -441,7 +536,7 @@ where _ => return Err(OpError::UnexpectedOpState), } - build_op_result(self.id, new_state, return_msg, result, self._ttl) + build_op_result(self.id, new_state, return_msg, result, stats, self._ttl) }) } } @@ -451,12 +546,14 @@ fn build_op_result( state: Option, msg: Option, result: Option, + stats: Option, ttl: Duration, ) -> Result { let output_op = Some(GetOp { id, state, result, + stats, _ttl: ttl, }); Ok(OperationResult { @@ -510,14 +607,11 @@ fn check_contract_found( } } -pub(crate) fn start_op(key: ContractKey, fetch_contract: bool, id: &PeerKey) -> GetOp { - tracing::debug!( - "Requesting get contract {} @ loc({})", - key, - Location::from(&key) - ); +pub(crate) fn start_op(key: ContractKey, fetch_contract: bool, this_peer: &PeerKey) -> GetOp { + let contract_location = Location::from(&key); + tracing::debug!("Requesting get contract {} @ loc({contract_location})", key,); - let id = Transaction::new(::tx_type_id(), id); + let id = Transaction::new(::tx_type_id(), this_peer); let state = Some(GetState::PrepareRequest { key, id, @@ -527,11 +621,17 @@ pub(crate) fn start_op(key: ContractKey, fetch_contract: bool, id: &PeerKey) -> id, state, result: None, + stats: Some(GetStats { + contract_location, + caching_peer: None, + transfer_time: None, + first_response_time: None, + step: Default::default(), + }), _ttl: PEER_TIMEOUT, } } -#[derive(PartialEq, Eq, Debug, Clone)] enum GetState { /// A new petition for a get op. ReceivedRequest, @@ -555,19 +655,18 @@ pub(crate) async fn request_get( get_op: GetOp, client_id: Option, ) -> Result<(), OpError> { - let (target, id) = if let Some(GetState::PrepareRequest { key, id, .. }) = get_op.state.clone() - { + let (target, id) = if let Some(GetState::PrepareRequest { key, id, .. }) = &get_op.state { // the initial request must provide: // - a location in the network where the contract resides // - and the key of the contract value to get ( op_storage .ring - .closest_caching(&key, 1, &[]) + .closest_caching(key, 1, &[]) .into_iter() .next() .ok_or(RingError::EmptyRing)?, - id, + *id, ) } else { return Err(OpError::UnexpectedOpState); @@ -578,7 +677,7 @@ pub(crate) async fn request_get( id ); - match get_op.state.clone() { + match get_op.state { Some(GetState::PrepareRequest { fetch_contract, key, @@ -591,22 +690,26 @@ pub(crate) async fn request_get( fetch_contract, }); - let msg = Some(GetMsg::RequestGet { + let msg = GetMsg::RequestGet { id, key, target, fetch_contract, - }); + }; let op = GetOp { id, state: new_state, result: None, + stats: get_op.stats.map(|mut s| { + s.caching_peer = Some(target); + s + }), _ttl: get_op._ttl, }; op_storage - .notify_op_change(msg.map(Message::from).unwrap(), OpEnum::Get(op), client_id) + .notify_op_change(Message::from(msg), OpEnum::Get(op), client_id) .await?; } _ => return Err(OpError::InvalidStateTransition(get_op.id)), @@ -625,11 +728,6 @@ mod messages { #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub(crate) enum GetMsg { - /// Internal node call to route to a peer close to the contract. - FetchRouting { - id: Transaction, - target: PeerKeyLocation, - }, RequestGet { id: Transaction, target: PeerKeyLocation, @@ -656,7 +754,6 @@ mod messages { impl InnerMessage for GetMsg { fn id(&self) -> &Transaction { match self { - Self::FetchRouting { id, .. } => id, Self::RequestGet { id, .. } => id, Self::SeekNode { id, .. } => id, Self::ReturnGet { id, .. } => id, @@ -674,7 +771,6 @@ mod messages { pub fn target(&self) -> Option<&PeerKeyLocation> { match self { - Self::FetchRouting { target, .. } => Some(target), Self::SeekNode { target, .. } => Some(target), Self::RequestGet { target, .. } => Some(target), Self::ReturnGet { target, .. } => Some(target), @@ -683,7 +779,7 @@ mod messages { pub fn terminal(&self) -> bool { use GetMsg::*; - matches!(self, ReturnGet { .. } | SeekNode { .. }) + matches!(self, ReturnGet { .. }) } } @@ -691,7 +787,6 @@ mod messages { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let id = self.id(); match self { - Self::FetchRouting { .. } => write!(f, "FetchRouting(id: {id})"), Self::RequestGet { .. } => write!(f, "RequestGet(id: {id})"), Self::SeekNode { .. } => write!(f, "SeekNode(id: {id})"), Self::ReturnGet { .. } => write!(f, "ReturnGet(id: {id})"), diff --git a/crates/core/src/operations/join_ring.rs b/crates/core/src/operations/join_ring.rs index ca20df520..876549dec 100644 --- a/crates/core/src/operations/join_ring.rs +++ b/crates/core/src/operations/join_ring.rs @@ -2,7 +2,7 @@ use futures::Future; use std::pin::Pin; use std::{collections::HashSet, time::Duration}; -use super::{OpError, OperationResult}; +use super::{OpError, OpOutcome, OperationResult}; use crate::operations::op_trait::Operation; use crate::operations::OpInitialization; use crate::{ @@ -34,9 +34,15 @@ impl JoinRingOp { self.backoff.is_some() } - pub(super) fn finished(&self) -> bool { - todo!() + pub(super) fn outcome(&self) -> OpOutcome { + OpOutcome::Irrelevant + } + + pub(super) fn finalized(&self) -> bool { + matches!(self.state, Some(JRState::Connected)) } + + pub(super) fn record_transfer(&mut self) {} } pub(crate) struct JoinRingResult {} @@ -49,7 +55,7 @@ impl TryFrom for JoinRingResult { } } -impl Operation for JoinRingOp { +impl Operation for JoinRingOp { type Message = JoinRingMsg; type Result = JoinRingResult; @@ -89,7 +95,7 @@ impl Operation for JoinRingOp { &self.id } - fn process_message<'a>( + fn process_message<'a, CB: ConnectionBridge>( self, conn_manager: &'a mut CB, op_storage: &'a OpManager, @@ -292,9 +298,11 @@ impl Operation for JoinRingOp { } _ => return Err(OpError::InvalidStateTransition(self.id)), }; - if let Some(state) = new_state.clone() { + if let Some(state) = new_state { if state.is_connected() { new_state = None; + } else { + new_state = Some(state); } }; } @@ -447,9 +455,11 @@ impl Operation for JoinRingOp { } _ => return Err(OpError::InvalidStateTransition(self.id)), } - if let Some(state) = new_state.clone() { + if let Some(state) = new_state { if state.is_connected() { new_state = None; + } else { + new_state = Some(state) } }; } @@ -471,7 +481,7 @@ impl Operation for JoinRingOp { } _ => return Err(OpError::InvalidStateTransition(self.id)), } - if let Some(state) = new_state.clone() { + if let Some(state) = new_state { if !state.is_connected() { return Err(OpError::InvalidStateTransition(id)); } else { @@ -494,7 +504,7 @@ impl Operation for JoinRingOp { } _ => return Err(OpError::InvalidStateTransition(self.id)), }; - if let Some(state) = new_state.clone() { + if let Some(state) = new_state { if !state.is_connected() { return Err(OpError::InvalidStateTransition(id)); } else { @@ -626,7 +636,6 @@ mod states { } } -#[derive(Debug, Clone)] enum JRState { Initializing, Connecting(ConnectionInfo), @@ -704,21 +713,23 @@ pub(crate) async fn join_ring_request( tx: Transaction, op_storage: &OpManager, conn_manager: &mut CB, - mut join_op: JoinRingOp, + join_op: JoinRingOp, ) -> Result<(), OpError> where CB: ConnectionBridge, { + let JoinRingOp { + id, + state, + backoff, + _ttl, + .. + } = join_op; let ConnectionInfo { gateway, this_peer, max_hops_to_live, - } = join_op - .state - .as_mut() - .expect("Infallible") - .clone() - .try_unwrap_connecting()?; + } = state.expect("infallible").try_unwrap_connecting()?; tracing::info!( "Joining ring via {} (at {}) (tx: {})", @@ -738,7 +749,20 @@ where }, }); conn_manager.send(&gateway.peer, join_req).await?; - op_storage.push(tx, OpEnum::JoinRing(Box::new(join_op)))?; + op_storage.push( + tx, + OpEnum::JoinRing(Box::new(JoinRingOp { + id, + state: Some(JRState::Connecting(ConnectionInfo { + gateway, + this_peer, + max_hops_to_live, + })), + gateway: Box::new(gateway), + backoff, + _ttl, + })), + )?; Ok(()) } diff --git a/crates/core/src/operations/op_trait.rs b/crates/core/src/operations/op_trait.rs index d5a4d63f5..646fec42b 100644 --- a/crates/core/src/operations/op_trait.rs +++ b/crates/core/src/operations/op_trait.rs @@ -7,11 +7,11 @@ use futures::Future; use crate::{ client_events::ClientId, message::{InnerMessage, Transaction}, - node::OpManager, + node::{ConnectionBridge, OpManager}, operations::{OpError, OpInitialization, OperationResult}, }; -pub(crate) trait Operation +pub(crate) trait Operation where Self: Sized + TryInto, { @@ -24,12 +24,10 @@ where msg: &Self::Message, ) -> Result, OpError>; - // fn new(transaction: Transaction, builder: Self::Builder) -> Self; - fn id(&self) -> &Transaction; #[allow(clippy::type_complexity)] - fn process_message<'a>( + fn process_message<'a, CB: ConnectionBridge>( self, conn_manager: &'a mut CB, op_storage: &'a OpManager, diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 9c6a61a72..60e20361e 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -2,15 +2,15 @@ //! a given radius will cache a copy of the contract and it's current value, //! as well as will broadcast updates to the contract value to all subscribers. -use std::collections::HashSet; use std::future::Future; use std::pin::Pin; use std::time::Duration; +use std::{collections::HashSet, time::Instant}; pub(crate) use self::messages::PutMsg; use freenet_stdlib::prelude::*; -use super::{OpEnum, OpError, OperationResult}; +use super::{OpEnum, OpError, OpOutcome, OperationResult}; use crate::{ client_events::ClientId, config::PEER_TIMEOUT, @@ -24,28 +24,104 @@ use crate::{ pub(crate) struct PutOp { id: Transaction, state: Option, + stats: Option, /// time left until time out, when this reaches zero it will be removed from the state _ttl: Duration, - done: bool, } impl PutOp { - pub(super) fn finished(&self) -> bool { - self.done + pub(super) fn outcome(&self) -> OpOutcome { + match &self.stats { + Some(PutStats { + contract_location, + payload_size, + // first_response_time: Some((response_start, Some(response_end))), + transfer_time: Some((transfer_start, Some(transfer_end))), + target: Some(target), + .. + }) => { + let payload_transfer_time = *transfer_end - *transfer_start; + // todo: check if this is correct + // in puts both times are equivalent since when the transfer is initialized + // it already contains the payload + let first_response_time = payload_transfer_time.clone(); + OpOutcome::ContractOpSuccess { + target_peer: target, + contract_location: *contract_location, + payload_size: *payload_size, + payload_transfer_time, + first_response_time, + } + } + Some(_) => OpOutcome::Incomplete, + None => OpOutcome::Irrelevant, + } + } + + pub(super) fn finalized(&self) -> bool { + self.stats + .as_ref() + .map(|s| matches!(s.step, RecordingStats::Completed)) + .unwrap_or(false) + } + + pub(super) fn record_transfer(&mut self) { + if let Some(stats) = self.stats.as_mut() { + match stats.step { + RecordingStats::Uninitialized => { + stats.transfer_time = Some((Instant::now(), None)); + stats.step = RecordingStats::InitPut; + } + RecordingStats::InitPut => { + if let Some((_, e)) = stats.transfer_time.as_mut() { + *e = Some(Instant::now()); + } + stats.step = RecordingStats::Completed; + } + RecordingStats::Completed => {} + } + } } } +struct PutStats { + contract_location: Location, + payload_size: usize, + // /// (start, end) + // first_response_time: Option<(Instant, Option)>, + /// (start, end) + transfer_time: Option<(Instant, Option)>, + target: Option, + step: RecordingStats, +} + +/// While timing, at what particular step we are now. +#[derive(Clone, Copy, Default)] +enum RecordingStats { + #[default] + Uninitialized, + InitPut, + Completed, +} + pub(crate) struct PutResult {} impl TryFrom for PutResult { type Error = OpError; - fn try_from(_value: PutOp) -> Result { - todo!() + fn try_from(op: PutOp) -> Result { + if let Some(true) = op + .stats + .map(|s| matches!(s.step, RecordingStats::Completed)) + { + Ok(PutResult {}) + } else { + Err(OpError::UnexpectedOpState) + } } } -impl Operation for PutOp { +impl Operation for PutOp { type Message = PutMsg; type Result = PutResult; @@ -70,7 +146,7 @@ impl Operation for PutOp { Ok(OpInitialization { op: Self { state: Some(PutState::ReceivedRequest), - done: false, + stats: None, // don't care for stats in the target peers id: tx, _ttl: PEER_TIMEOUT, }, @@ -85,7 +161,7 @@ impl Operation for PutOp { &self.id } - fn process_message<'a>( + fn process_message<'a, CB: ConnectionBridge>( self, conn_manager: &'a mut CB, op_storage: &'a OpManager, @@ -95,7 +171,7 @@ impl Operation for PutOp { Box::pin(async move { let return_msg; let new_state; - let mut done = false; + let stats = self.stats; match input { PutMsg::RequestPut { @@ -339,7 +415,6 @@ impl Operation for PutOp { tracing::debug!("Successfully updated value for {}", contract,); new_state = None; return_msg = None; - done = true; } _ => return Err(OpError::InvalidStateTransition(self.id)), }; @@ -405,7 +480,7 @@ impl Operation for PutOp { _ => return Err(OpError::UnexpectedOpState), } - build_op_result(self.id, new_state, return_msg, self._ttl, done) + build_op_result(self.id, new_state, return_msg, self._ttl, stats) }) } } @@ -415,12 +490,12 @@ fn build_op_result( state: Option, msg: Option, ttl: Duration, - done: bool, + stats: Option, ) -> Result { let output_op = Some(PutOp { id, state, - done, + stats, _ttl: ttl, }); Ok(OperationResult { @@ -488,7 +563,7 @@ async fn try_to_broadcast( let op = PutOp { id, state: new_state, - done: false, + stats: None, _ttl: ttl, }; op_storage @@ -514,15 +589,15 @@ pub(crate) fn start_op( peer: &PeerKey, ) -> PutOp { let key = contract.key(); + let contract_location = Location::from(&key); tracing::debug!( - "Requesting put to contract {} @ loc({})", + "Requesting put to contract {} @ loc({contract_location})", key, - Location::from(&key) ); let id = Transaction::new(::tx_type_id(), peer); + let payload_size = contract.data().len(); let state = Some(PutState::PrepareRequest { - id, contract, value, htl, @@ -531,16 +606,21 @@ pub(crate) fn start_op( PutOp { id, state, - done: false, + stats: Some(PutStats { + contract_location, + payload_size, + target: None, + // first_response_time: None, + transfer_time: None, + step: Default::default(), + }), _ttl: PEER_TIMEOUT, } } -#[derive(PartialEq, Eq, Debug, Clone)] enum PutState { ReceivedRequest, PrepareRequest { - id: Transaction, contract: ContractContainer, value: WrappedState, htl: usize, @@ -554,10 +634,10 @@ enum PutState { /// Request to insert/update a value into a contract. pub(crate) async fn request_put( op_storage: &OpManager, - put_op: PutOp, + mut put_op: PutOp, client_id: Option, ) -> Result<(), OpError> { - let key = if let Some(PutState::PrepareRequest { contract, .. }) = put_op.state.clone() { + let key = if let Some(PutState::PrepareRequest { contract, .. }) = &put_op.state { contract.key() } else { return Err(OpError::UnexpectedOpState); @@ -576,8 +656,11 @@ pub(crate) async fn request_put( .ok_or(RingError::EmptyRing)?; let id = put_op.id; + if let Some(stats) = &mut put_op.stats { + stats.target = Some(target); + } - match put_op.state.clone() { + match put_op.state { Some(PutState::PrepareRequest { contract, value, @@ -586,23 +669,23 @@ pub(crate) async fn request_put( }) => { let key = contract.key(); let new_state = Some(PutState::AwaitingResponse { contract: key }); - let msg = Some(PutMsg::RequestPut { + let msg = PutMsg::RequestPut { id, contract, value, htl, target, - }); + }; let op = PutOp { state: new_state, id, - done: false, + stats: put_op.stats, _ttl: put_op._ttl, }; op_storage - .notify_op_change(msg.map(Message::from).unwrap(), OpEnum::Put(op), client_id) + .notify_op_change(Message::from(msg), OpEnum::Put(op), client_id) .await?; } _ => return Err(OpError::InvalidStateTransition(put_op.id)), diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 726eaad65..8bc768bfb 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -15,7 +15,7 @@ use crate::{ ring::{PeerKeyLocation, RingError}, }; -use super::{OpEnum, OpError, OperationResult}; +use super::{OpEnum, OpError, OpOutcome, OperationResult}; pub(crate) use self::messages::SubscribeMsg; @@ -28,9 +28,15 @@ pub(crate) struct SubscribeOp { } impl SubscribeOp { - pub fn finished(&self) -> bool { - todo!() + pub(super) fn outcome(&self) -> OpOutcome { + OpOutcome::Irrelevant + } + + pub(super) fn finalized(&self) -> bool { + matches!(self.state, Some(SubscribeState::Completed)) } + + pub(super) fn record_transfer(&mut self) {} } pub(crate) enum SubscribeResult {} @@ -43,7 +49,7 @@ impl TryFrom for SubscribeResult { } } -impl Operation for SubscribeOp { +impl Operation for SubscribeOp { type Message = SubscribeMsg; type Result = SubscribeResult; @@ -85,7 +91,7 @@ impl Operation for SubscribeOp { &self.id } - fn process_message<'a>( + fn process_message<'a, CB: ConnectionBridge>( self, conn_manager: &'a mut CB, op_storage: &'a OpManager, @@ -300,7 +306,6 @@ pub(crate) fn start_op(key: ContractKey, peer: &PeerKey) -> SubscribeOp { } } -#[derive(PartialEq, Eq, Debug, Clone)] enum SubscribeState { /// Prepare the request to subscribe. PrepareRequest { @@ -323,42 +328,39 @@ pub(crate) async fn request_subscribe( sub_op: SubscribeOp, client_id: Option, ) -> Result<(), OpError> { - let (target, _id) = - if let Some(SubscribeState::PrepareRequest { id, key }) = sub_op.state.clone() { - if !op_storage.ring.is_contract_cached(&key) { - return Err(OpError::ContractError(ContractError::ContractNotFound(key))); - } - ( - op_storage - .ring - .closest_caching(&key, 1, &[]) - .into_iter() - .next() - .ok_or(RingError::EmptyRing)?, - id, - ) - } else { - return Err(OpError::UnexpectedOpState); - }; - - match sub_op.state.clone() { + let (target, _id) = if let Some(SubscribeState::PrepareRequest { id, key }) = &sub_op.state { + if !op_storage.ring.is_contract_cached(key) { + return Err(OpError::ContractError(ContractError::ContractNotFound( + key.clone(), + ))); + } + ( + op_storage + .ring + .closest_caching(key, 1, &[]) + .into_iter() + .next() + .ok_or(RingError::EmptyRing)?, + *id, + ) + } else { + return Err(OpError::UnexpectedOpState); + }; + + match sub_op.state { Some(SubscribeState::PrepareRequest { id, key, .. }) => { let new_state = Some(SubscribeState::AwaitingResponse { skip_list: vec![], retries: 0, }); - let msg = Some(SubscribeMsg::RequestSub { id, key, target }); + let msg = SubscribeMsg::RequestSub { id, key, target }; let op = SubscribeOp { id, state: new_state, _ttl: sub_op._ttl, }; op_storage - .notify_op_change( - msg.map(Message::from).unwrap(), - OpEnum::Subscribe(op), - client_id, - ) + .notify_op_change(Message::from(msg), OpEnum::Subscribe(op), client_id) .await?; } _ => return Err(OpError::InvalidStateTransition(sub_op.id)), diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 49e5efe66..42339191f 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -17,7 +17,7 @@ impl TryFrom for UpdateResult { } } -impl Operation for UpdateOp { +impl Operation for UpdateOp { type Message = UpdateMsg; type Result = UpdateResult; @@ -32,7 +32,7 @@ impl Operation for UpdateOp { todo!() } - fn process_message<'a>( + fn process_message<'a, CB: ConnectionBridge>( self, _conn_manager: &'a mut CB, _op_storage: &'a crate::node::OpManager, diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index 3227fdeb4..6a655d341 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -27,7 +27,10 @@ use freenet_stdlib::prelude::ContractKey; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; -use crate::node::{self, NodeBuilder, PeerKey}; +use crate::{ + node::{self, NodeBuilder, PeerKey}, + router::Router, +}; #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] /// The location of a peer in the ring. This location allows routing towards the peer. @@ -65,6 +68,7 @@ pub(crate) struct Ring { pub peer_key: PeerKey, max_connections: usize, min_connections: usize, + router: Arc>, connections_by_location: Arc>>, location_for_peer: Arc>>, /// contracts in the ring cached by this node @@ -143,11 +147,14 @@ impl Ring { Self::MAX_CONNECTIONS }; + let router = Router::new(&[]); + let ring = Ring { rnd_if_htl_above, max_hops_to_live, max_connections, min_connections, + router: Arc::new(RwLock::new(router)), connections_by_location: Arc::new(RwLock::new(BTreeMap::new())), location_for_peer: Arc::new(RwLock::new(BTreeMap::new())), cached_contracts: DashSet::new(), @@ -318,6 +325,10 @@ impl Ring { iter.collect() } + pub fn routing_finished(&self, event: crate::router::RouteEvent) { + self.router.write().add_event(event); + } + /// Get a random peer from the known ring connections. pub fn random_peer(&self, filter_fn: F) -> Option where diff --git a/crates/core/src/router.rs b/crates/core/src/router.rs index 9d6ca0689..f15ae7bdc 100644 --- a/crates/core/src/router.rs +++ b/crates/core/src/router.rs @@ -17,7 +17,7 @@ pub(crate) struct Router { } impl Router { - fn new(history: &[RouteEvent]) -> Self { + pub fn new(history: &[RouteEvent]) -> Self { let failure_outcomes: Vec = history .iter() .map(|re| IsotonicEvent { @@ -107,7 +107,7 @@ impl Router { } } - fn add_event(&mut self, event: RouteEvent) { + pub fn add_event(&mut self, event: RouteEvent) { match event.outcome { RouteOutcome::Success { time_to_response_start, @@ -143,14 +143,11 @@ impl Router { } } - pub(crate) fn select_peer<'a, I>( + pub fn select_peer<'a>( &self, - peers: I, + peers: impl IntoIterator, contract_location: &Location, - ) -> Option<&'a PeerKeyLocation> - where - I: IntoIterator, - { + ) -> Option<&'a PeerKeyLocation> { if !self.has_sufficient_historical_data() { // Find the peer with the minimum distance to the contract location, // ignoring peers with no location @@ -231,27 +228,27 @@ impl Router { } #[derive(Debug)] -pub(crate) enum RoutingError { +enum RoutingError { InsufficientDataError, } #[derive(Debug, Clone, Copy, Serialize)] -pub(crate) struct RoutingPrediction { - pub failure_probability: f64, - pub xfer_speed: TransferSpeed, - pub time_to_response_start: f64, - pub expected_total_time: f64, +struct RoutingPrediction { + failure_probability: f64, + xfer_speed: TransferSpeed, + time_to_response_start: f64, + expected_total_time: f64, } #[derive(Debug, Clone, Copy, Serialize)] pub(crate) struct RouteEvent { - peer: PeerKeyLocation, - contract_location: Location, - outcome: RouteOutcome, + pub peer: PeerKeyLocation, + pub contract_location: Location, + pub outcome: RouteOutcome, } #[derive(Debug, Clone, Copy, Serialize)] -pub(crate) enum RouteOutcome { +pub enum RouteOutcome { Success { time_to_response_start: Duration, payload_size: usize, diff --git a/crates/core/src/router/isotonic_estimator.rs b/crates/core/src/router/isotonic_estimator.rs index 300854e11..4a08b0bc8 100644 --- a/crates/core/src/router/isotonic_estimator.rs +++ b/crates/core/src/router/isotonic_estimator.rs @@ -13,9 +13,9 @@ const MIN_POINTS_FOR_REGRESSION: usize = 5; /// outcome of the peer's previous requests. #[derive(Debug, Clone, Serialize)] -pub(crate) struct IsotonicEstimator { - pub(crate) global_regression: IsotonicRegression, - pub(crate) peer_adjustments: HashMap, +pub(super) struct IsotonicEstimator { + pub global_regression: IsotonicRegression, + pub peer_adjustments: HashMap, } impl IsotonicEstimator { @@ -148,7 +148,7 @@ impl IsotonicEstimator { } } -pub(crate) enum EstimatorType { +pub(super) enum EstimatorType { /// Where the estimated value is expected to increase as distance increases Positive, /// Where the estimated value is expected to decrease as distance increases @@ -156,14 +156,14 @@ pub(crate) enum EstimatorType { } #[derive(Debug, PartialEq, Eq)] -pub(crate) enum EstimationError { +pub(super) enum EstimationError { InsufficientData, // Error indicating that there is not enough data for estimation } /// A routing event is a single request to a peer for a contract, and some value indicating /// the result of the request, such as the time it took to retrieve the contract. #[derive(Debug, Clone)] -pub(crate) struct IsotonicEvent { +pub(super) struct IsotonicEvent { pub peer: PeerKeyLocation, pub contract_location: Location, /// The result of the routing event, which is used to train the estimator, typically the time @@ -179,7 +179,7 @@ impl IsotonicEvent { } #[derive(Debug, Clone, Serialize)] -pub(crate) struct Adjustment { +pub(super) struct Adjustment { sum: f64, count: u64, } diff --git a/crates/core/src/router/util.rs b/crates/core/src/router/util.rs index f8fe151e0..0f78072d0 100644 --- a/crates/core/src/router/util.rs +++ b/crates/core/src/router/util.rs @@ -3,7 +3,7 @@ use std::time::Duration; use serde::Serialize; #[derive(Debug, Clone, Serialize)] -pub(crate) struct Mean { +pub(super) struct Mean { sum: f64, count: u64, } @@ -35,8 +35,8 @@ impl Default for Mean { } #[derive(Debug, Clone, Copy, Serialize)] -pub(crate) struct TransferSpeed { - pub(crate) bytes_per_second: f64, +pub(super) struct TransferSpeed { + pub bytes_per_second: f64, } impl TransferSpeed { From d965a0bde546c4c0d4f2e94c743cbd71869caf2c Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Thu, 12 Oct 2023 14:10:23 +0200 Subject: [PATCH 2/5] Log route events and reload router periodically --- crates/core/src/config.rs | 28 +- crates/core/src/node.rs | 80 ++++-- crates/core/src/node/conn_manager.rs | 8 + .../core/src/node/conn_manager/p2p_protoc.rs | 11 +- crates/core/src/node/event_log.rs | 239 +++++++++++++----- crates/core/src/node/in_memory_impl.rs | 18 +- crates/core/src/node/p2p_impl.rs | 32 ++- crates/core/src/node/tests.rs | 4 +- crates/core/src/operations/join_ring.rs | 10 + crates/core/src/operations/put.rs | 61 ++--- crates/core/src/ring.rs | 28 +- crates/core/src/router.rs | 46 ++-- crates/core/src/router/isotonic_estimator.rs | 15 +- crates/core/src/router/util.rs | 2 +- 14 files changed, 405 insertions(+), 177 deletions(-) diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index 855de5d37..f8b8155dd 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -86,6 +86,7 @@ pub struct ConfigPaths { secrets_dir: PathBuf, db_dir: PathBuf, app_data_dir: PathBuf, + event_log: PathBuf, } impl ConfigPaths { @@ -122,12 +123,21 @@ impl ConfigPaths { fs::create_dir_all(db_dir.join("local"))?; } + let event_log = app_data_dir.join("_EVENT_LOG"); + if !event_log.exists() { + fs::write(&event_log, [])?; + let mut local_file = event_log.clone(); + local_file.set_file_name("_EVENT_LOG_LOCAL"); + fs::write(local_file, [])?; + } + Ok(Self { contracts_dir, delegates_dir, secrets_dir, db_dir, app_data_dir, + event_log, }) } } @@ -176,8 +186,24 @@ impl Config { } } + pub fn event_log(&self) -> PathBuf { + if self.local_mode.load(std::sync::atomic::Ordering::SeqCst) { + let mut local_file = self.config_paths.event_log.clone(); + local_file.set_file_name("_EVENT_LOG_LOCAL"); + local_file + } else { + self.config_paths.event_log.to_owned() + } + } + pub fn get_static_conf() -> &'static Config { - CONFIG.get_or_init(|| Config::load_conf().expect("Failed to load configuration")) + CONFIG.get_or_init(|| match Config::load_conf() { + Ok(config) => config, + Err(err) => { + tracing::error!("failed while loading configuration: {err}"); + panic!("Failed while loading configuration") + } + }) } fn load_conf() -> std::io::Result { diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index fc38d6dcf..a690b41a6 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -20,10 +20,7 @@ use libp2p::{identity, multiaddr::Protocol, Multiaddr, PeerId}; #[cfg(test)] use self::in_memory_impl::NodeInMemory; -use self::{ - event_log::{EventLog, EventLogListener}, - p2p_impl::NodeP2P, -}; +use self::{event_log::EventLog, p2p_impl::NodeP2P}; use crate::{ client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest}, config::Config, @@ -45,6 +42,9 @@ use crate::{ use crate::operations::handle_op_request; pub(crate) use conn_manager::{ConnectionBridge, ConnectionError}; +#[cfg(test)] +pub(crate) use event_log::test_utils::TestEventListener; +pub(crate) use event_log::{EventLogRegister, EventRegister}; pub(crate) use op_state::OpManager; mod conn_manager; @@ -184,7 +184,11 @@ impl NodeBuilder { /// Builds a node using the default backend connection manager. pub async fn build(self, config: NodeConfig) -> Result { - let node = NodeP2P::build::(self, config).await?; + let event_log = event_log::EventRegister::new(); + let node = NodeP2P::build::( + self, event_log, config, + ) + .await?; Ok(Node(node)) } @@ -446,6 +450,7 @@ async fn report_result( op_storage: &OpManager, executor_callback: Option>, client_req_handler_callback: Option<(ClientId, ClientResponsesSender)>, + event_listener: &mut Box, ) { match op_result { Ok(Some(op_res)) => { @@ -463,7 +468,7 @@ async fn report_result( payload_size, payload_transfer_time, } => { - op_storage.ring.routing_finished(RouteEvent { + let event = RouteEvent { peer: *target_peer, contract_location, outcome: RouteOutcome::Success { @@ -471,7 +476,14 @@ async fn report_result( payload_size, payload_transfer_time, }, - }); + }; + if let Err(err) = event_listener + .event_received(EventLog::route_event(op_res.id(), op_storage, &event)) + .await + { + tracing::warn!("failed logging event: {err}"); + } + op_storage.ring.routing_finished(event); } // todo: handle failures, need to track timeouts and other potential failures // OpOutcome::ContractOpFailure { @@ -501,7 +513,7 @@ async fn process_message( msg: Result, op_storage: Arc, mut conn_manager: CB, - event_listener: Option>, + mut event_listener: Box, executor_callback: Option>, client_req_handler_callback: Option, client_id: Option, @@ -511,8 +523,11 @@ async fn process_message( let cli_req = client_id.zip(client_req_handler_callback); match msg { Ok(msg) => { - if let Some(mut listener) = event_listener { - listener.event_received(EventLog::new(&msg, &op_storage)); + if let Err(err) = event_listener + .event_received(EventLog::from_msg(&msg, &op_storage)) + .await + { + tracing::warn!("failed logging event: {err}"); } match msg { Message::JoinRing(op) => { @@ -524,7 +539,14 @@ async fn process_message( client_id, ) .await; - report_result(op_result, &op_storage, executor_callback, cli_req).await; + report_result( + op_result, + &op_storage, + executor_callback, + cli_req, + &mut event_listener, + ) + .await; } Message::Put(op) => { log_handling_msg!("put", *op.id(), op_storage); @@ -535,7 +557,14 @@ async fn process_message( client_id, ) .await; - report_result(op_result, &op_storage, executor_callback, cli_req).await; + report_result( + op_result, + &op_storage, + executor_callback, + cli_req, + &mut event_listener, + ) + .await; } Message::Get(op) => { log_handling_msg!("get", op.id(), op_storage); @@ -546,7 +575,14 @@ async fn process_message( client_id, ) .await; - report_result(op_result, &op_storage, executor_callback, cli_req).await; + report_result( + op_result, + &op_storage, + executor_callback, + cli_req, + &mut event_listener, + ) + .await; } Message::Subscribe(op) => { log_handling_msg!("subscribe", op.id(), op_storage); @@ -557,13 +593,27 @@ async fn process_message( client_id, ) .await; - report_result(op_result, &op_storage, executor_callback, cli_req).await; + report_result( + op_result, + &op_storage, + executor_callback, + cli_req, + &mut event_listener, + ) + .await; } _ => {} } } Err(err) => { - report_result(Err(err.into()), &op_storage, executor_callback, cli_req).await; + report_result( + Err(err.into()), + &op_storage, + executor_callback, + cli_req, + &mut event_listener, + ) + .await; } } } diff --git a/crates/core/src/node/conn_manager.rs b/crates/core/src/node/conn_manager.rs index e1c6cf73d..5ad9e25c6 100644 --- a/crates/core/src/node/conn_manager.rs +++ b/crates/core/src/node/conn_manager.rs @@ -27,6 +27,14 @@ pub(crate) type ConnResult = std::result::Result; pub(crate) trait ConnectionBridge: Send + Sync { async fn add_connection(&mut self, peer: PeerKey) -> ConnResult<()>; + // todo: LRU connection drop IF we can connect to other peer + // at least have a minimum of connection time alive to consider dropping it + + // If we get a join request and are at MAX_CONNECTIONS: + // 1. Ensure it's been N minutes since the last peer removal. + // 2. Drop the peer with the fewest outbound requests/minute that's at least M minutes old. + // This promotes peer turnover to prevent network stagnation. + async fn drop_connection(&mut self, peer: &PeerKey) -> ConnResult<()>; async fn send(&self, target: &PeerKey, msg: Message) -> ConnResult<()>; diff --git a/crates/core/src/node/conn_manager/p2p_protoc.rs b/crates/core/src/node/conn_manager/p2p_protoc.rs index 4eee493ad..bc996135d 100644 --- a/crates/core/src/node/conn_manager/p2p_protoc.rs +++ b/crates/core/src/node/conn_manager/p2p_protoc.rs @@ -42,8 +42,8 @@ use crate::{ contract::{ClientResponsesSender, ExecutorToEventLoopChannel, NetworkEventListenerHalve}, message::{Message, NodeEvent, Transaction, TransactionType}, node::{ - handle_cancelled_op, join_ring_request, process_message, InitPeerNode, NodeBuilder, - OpManager, PeerKey, + handle_cancelled_op, join_ring_request, process_message, EventLogRegister, InitPeerNode, + NodeBuilder, OpManager, PeerKey, }, operations::OpError, ring::PeerKeyLocation, @@ -173,6 +173,7 @@ pub(in crate::node) struct P2pConnManager { conn_bridge_rx: Receiver, /// last valid observed public address public_addr: Option, + event_listener: Box, } impl P2pConnManager { @@ -180,6 +181,7 @@ impl P2pConnManager { transport: transport::Boxed<(PeerId, muxing::StreamMuxerBox)>, config: &NodeBuilder, op_manager: Arc, + event_listener: &dyn EventLogRegister, ) -> Result { // We set a global executor which is virtually the Tokio multi-threaded executor // to reuse it's thread pool and scheduler in order to drive futures. @@ -219,6 +221,7 @@ impl P2pConnManager { bridge, conn_bridge_rx: rx_bridge_cmd, public_addr, + event_listener: event_listener.trait_clone(), }) } @@ -396,7 +399,7 @@ impl P2pConnManager { Ok(msg), op_manager.clone(), cb, - None, + self.event_listener.trait_clone(), executor_callback, client_req_handler_callback, client_id, @@ -458,7 +461,7 @@ impl P2pConnManager { Err(err), op_manager.clone(), cb, - None, + self.event_listener.trait_clone(), None, None, None, diff --git a/crates/core/src/node/event_log.rs b/crates/core/src/node/event_log.rs index 7828c1139..479927890 100644 --- a/crates/core/src/node/event_log.rs +++ b/crates/core/src/node/event_log.rs @@ -1,11 +1,20 @@ use freenet_stdlib::prelude::*; +use futures::{future::BoxFuture, FutureExt}; +use serde::{Deserialize, Serialize}; +use tokio::{ + fs::OpenOptions, + sync::mpsc::{self}, +}; use super::PeerKey; use crate::{ + config::GlobalExecutor, contract::StoreResponse, message::{Message, Transaction}, operations::{get::GetMsg, join_ring::JoinRingMsg, put::PutMsg}, ring::{Location, PeerKeyLocation}, + router::RouteEvent, + DynError, }; #[cfg(test)] @@ -21,9 +30,15 @@ struct ListenerLogId(usize); /// /// This type then can emit it's own information to adjacent systems /// or is a no-op. -pub(crate) trait EventLogListener { - fn event_received(&mut self, ev: EventLog); - fn trait_clone(&self) -> Box; +pub(crate) trait EventLogRegister: std::any::Any + Send + Sync + 'static { + fn event_received<'a>(&'a mut self, ev: EventLog) -> BoxFuture<'a, Result<(), DynError>>; + fn trait_clone(&self) -> Box; + fn as_any(&self) -> &dyn std::any::Any + where + Self: Sized, + { + self as _ + } } #[allow(dead_code)] // fixme: remove this @@ -34,7 +49,19 @@ pub(crate) struct EventLog<'a> { } impl<'a> EventLog<'a> { - pub fn new(msg: &'a Message, op_storage: &'a OpManager) -> Self { + pub fn route_event( + tx: &'a Transaction, + op_storage: &'a OpManager, + route_event: &RouteEvent, + ) -> Self { + EventLog { + tx, + peer_id: &op_storage.ring.peer_key, + kind: EventKind::Route(route_event.clone()), + } + } + + pub fn from_msg(msg: &'a Message, op_storage: &'a OpManager) -> Self { let kind = match msg { Message::JoinRing(JoinRingMsg::Connected { sender, target, .. }) => { EventKind::Connected { @@ -47,47 +74,37 @@ impl<'a> EventLog<'a> { contract, target, .. }) => { let key = contract.key(); - EventKind::Put( - PutEvent::Request { - performer: target.peer, - key, - }, - *msg.id(), - ) + EventKind::Put(PutEvent::Request { + performer: target.peer, + key, + }) } - Message::Put(PutMsg::SuccessfulUpdate { new_value, .. }) => EventKind::Put( - PutEvent::PutSuccess { + Message::Put(PutMsg::SuccessfulUpdate { new_value, .. }) => { + EventKind::Put(PutEvent::PutSuccess { requester: op_storage.ring.peer_key, value: new_value.clone(), - }, - *msg.id(), - ), + }) + } Message::Put(PutMsg::Broadcasting { new_value, broadcast_to, key, .. - }) => EventKind::Put( - PutEvent::BroadcastEmitted { - broadcast_to: broadcast_to.clone(), - key: key.clone(), - value: new_value.clone(), - }, - *msg.id(), - ), + }) => EventKind::Put(PutEvent::BroadcastEmitted { + broadcast_to: broadcast_to.clone(), + key: key.clone(), + value: new_value.clone(), + }), Message::Put(PutMsg::BroadcastTo { sender, new_value, key, .. - }) => EventKind::Put( - PutEvent::BroadcastReceived { - requester: sender.peer, - key: key.clone(), - value: new_value.clone(), - }, - *msg.id(), - ), + }) => EventKind::Put(PutEvent::BroadcastReceived { + requester: sender.peer, + key: key.clone(), + value: new_value.clone(), + }), Message::Get(GetMsg::ReturnGet { key, value: StoreResponse { state: Some(_), .. }, @@ -103,41 +120,136 @@ impl<'a> EventLog<'a> { } } -#[cfg(test)] -struct MessageLog { +#[derive(Serialize, Deserialize)] +struct LogMessage { + tx: Transaction, peer_id: PeerKey, kind: EventKind, } #[derive(Clone)] -pub(super) struct EventRegister {} +pub(crate) struct EventRegister { + log_sender: mpsc::Sender, +} -impl EventLogListener for EventRegister { - fn event_received(&mut self, _log: EventLog) { - // let (_msg_log, _log_id) = create_log(log); - // TODO: save log +impl EventRegister { + pub fn new() -> Self { + let (log_sender, log_recv) = mpsc::channel(1000); + GlobalExecutor::spawn(Self::record_logs(log_recv)); + Self { log_sender } } - fn trait_clone(&self) -> Box { + async fn record_logs(mut log_recv: mpsc::Receiver) { + use tokio::io::AsyncWriteExt; + let event_log_path = crate::config::Config::get_static_conf().event_log(); + let mut event_log = match OpenOptions::new().write(true).open(&event_log_path).await { + Ok(file) => file, + Err(err) => { + tracing::error!("failed openning log file {:?} with: {err}", event_log_path); + panic!("failed openning log file"); // todo: propagate this to the main thread + } + }; + let mut num_written = 0; + let mut buf = vec![]; + while let Some(log) = log_recv.recv().await { + if let Err(err) = bincode::serialize_into(&mut buf, &log) { + tracing::error!("failed serializing log: {err}"); + panic!("failed serializing log"); + } + buf.push(b'\n'); + num_written += 1; + if num_written == 100 { + if let Err(err) = event_log.write_all(&buf).await { + tracing::error!("failed writting to event log: {err}"); + panic!("failed writting event log"); + } + num_written = 0; + buf.clear(); + } + } + } + + pub async fn get_router_events(max_event_number: usize) -> Result, DynError> { + use tokio::io::AsyncReadExt; + const BUF_SIZE: usize = 4096; + const MAX_EVENT_HISTORY: usize = 10_000; + let event_num = max_event_number.min(MAX_EVENT_HISTORY); + + let event_log_path = crate::config::Config::get_static_conf().event_log(); + let mut event_log = OpenOptions::new().read(true).open(event_log_path).await?; + + let mut buf = [0; BUF_SIZE]; + let mut records = Vec::with_capacity(event_num); + let mut partial_record = vec![]; + let mut record_start = 0; + + while records.len() < event_num { + let bytes_read = event_log.read(&mut buf).await?; + if bytes_read == 0 { + break; // EOF + } + + let mut found_newline = false; + for (i, byte) in buf.iter().enumerate().take(bytes_read) { + if byte == &b'\n' { + found_newline = true; + let rec = &buf[record_start..i]; + let deser_record: LogMessage = if partial_record.is_empty() { + record_start = i + 1; + bincode::deserialize(rec)? + } else { + partial_record.extend(rec); + let rec = bincode::deserialize(&partial_record)?; + partial_record.clear(); + rec + }; + if let EventKind::Route(outcome) = deser_record.kind { + records.push(outcome); + } + } + if records.len() == event_num { + break; // Reached the desired event number + } + } + if !found_newline { + break; // No more data to read, and event_num not reached + } + } + Ok(records) + } +} + +impl EventLogRegister for EventRegister { + fn event_received<'a>(&'a mut self, log: EventLog) -> BoxFuture<'a, Result<(), DynError>> { + let log_msg = LogMessage { + tx: *log.tx, + kind: log.kind, + peer_id: *log.peer_id, + }; + async { Ok(self.log_sender.send(log_msg).await?) }.boxed() + } + + fn trait_clone(&self) -> Box { Box::new(self.clone()) } } -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Serialize, Deserialize)] enum EventKind { Connected { loc: Location, from: PeerKey, to: PeerKeyLocation, }, - Put(PutEvent, Transaction), + Put(PutEvent), Get { key: ContractKey, }, + Route(RouteEvent), Unknown, } -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] enum PutEvent { Request { performer: PeerKey, @@ -166,7 +278,7 @@ enum PutEvent { } #[cfg(test)] -mod test_utils { +pub(super) mod test_utils { use std::{ collections::HashMap, sync::{ @@ -183,22 +295,11 @@ mod test_utils { static LOG_ID: AtomicUsize = AtomicUsize::new(0); - #[inline] - fn create_log(log: EventLog) -> (MessageLog, ListenerLogId) { - let log_id = ListenerLogId(LOG_ID.fetch_add(1, SeqCst)); - let EventLog { peer_id, kind, .. } = log; - let msg_log = MessageLog { - peer_id: *peer_id, - kind, - }; - (msg_log, log_id) - } - #[derive(Clone)] pub(crate) struct TestEventListener { node_labels: Arc>, tx_log: Arc>>, - logs: Arc>>, + logs: Arc>>, } impl TestEventListener { @@ -228,7 +329,7 @@ mod test_utils { ) -> bool { let logs = self.logs.read(); let put_ops = logs.iter().filter_map(|l| match &l.kind { - EventKind::Put(ev, id) => Some((id, ev)), + EventKind::Put(ev) => Some((&l.tx, ev)), _ => None, }); let put_ops: HashMap<_, Vec<_>> = put_ops.fold(HashMap::new(), |mut acc, (id, ev)| { @@ -266,8 +367,8 @@ mod test_utils { pub fn contract_broadcasted(&self, for_key: &ContractKey) -> bool { let logs = self.logs.read(); let put_broadcast_ops = logs.iter().filter_map(|l| match &l.kind { - EventKind::Put(ev @ PutEvent::BroadcastEmitted { .. }, id) - | EventKind::Put(ev @ PutEvent::BroadcastReceived { .. }, id) => Some((id, ev)), + EventKind::Put(ev @ PutEvent::BroadcastEmitted { .. }) + | EventKind::Put(ev @ PutEvent::BroadcastReceived { .. }) => Some((&l.tx, ev)), _ => None, }); let put_broadcast_by_tx: HashMap<_, Vec<_>> = @@ -319,19 +420,31 @@ mod test_utils { .collect::>() .into_iter() } + + fn create_log(log: EventLog) -> (LogMessage, ListenerLogId) { + let log_id = ListenerLogId(LOG_ID.fetch_add(1, SeqCst)); + let EventLog { peer_id, kind, .. } = log; + let msg_log = LogMessage { + tx: *log.tx, + peer_id: *peer_id, + kind, + }; + (msg_log, log_id) + } } - impl super::EventLogListener for TestEventListener { - fn event_received(&mut self, log: EventLog) { + impl super::EventLogRegister for TestEventListener { + fn event_received<'a>(&'a mut self, log: EventLog) -> BoxFuture<'a, Result<(), DynError>> { let tx = log.tx; let mut logs = self.logs.write(); - let (msg_log, log_id) = create_log(log); + let (msg_log, log_id) = Self::create_log(log); logs.push(msg_log); std::mem::drop(logs); self.tx_log.entry(*tx).or_default().push(log_id); + async { Ok(()) }.boxed() } - fn trait_clone(&self) -> Box { + fn trait_clone(&self) -> Box { Box::new(self.clone()) } } diff --git a/crates/core/src/node/in_memory_impl.rs b/crates/core/src/node/in_memory_impl.rs index 84b07142a..13a12491a 100644 --- a/crates/core/src/node/in_memory_impl.rs +++ b/crates/core/src/node/in_memory_impl.rs @@ -6,10 +6,9 @@ use freenet_stdlib::prelude::*; use super::{ client_event_handling, conn_manager::{in_memory::MemoryConnManager, EventLoopNotifications}, - event_log::EventLogListener, handle_cancelled_op, join_ring_request, op_state::OpManager, - process_message, PeerKey, + process_message, EventLogRegister, PeerKey, }; use crate::{ client_events::ClientEventsProxy, @@ -31,16 +30,16 @@ pub(super) struct NodeInMemory { gateways: Vec, notification_channel: EventLoopNotifications, conn_manager: MemoryConnManager, - event_listener: Option>, + event_listener: Box, is_gateway: bool, _executor_listener: ExecutorToEventLoopChannel, } impl NodeInMemory { /// Buils an in-memory node. Does nothing upon construction, - pub async fn build( + pub async fn build( builder: NodeBuilder<1>, - event_listener: Option>, + event_listener: EL, ch_builder: CH::Builder, ) -> Result where @@ -51,7 +50,7 @@ impl NodeInMemory { let gateways = builder.get_gateways()?; let is_gateway = builder.local_ip.zip(builder.local_port).is_some(); - let ring = Ring::new(&builder, &gateways)?; + let ring = Ring::new::<1, EL>(&builder, &gateways)?; let (notification_channel, notification_tx) = EventLoopNotifications::channel(); let (ops_ch_channel, ch_channel) = contract::contract_handler_channel(); let op_storage = Arc::new(OpManager::new(ring, notification_tx, ops_ch_channel)); @@ -68,7 +67,7 @@ impl NodeInMemory { op_storage, gateways, notification_channel, - event_listener, + event_listener: Box::new(event_listener), is_gateway, _executor_listener, }) @@ -207,10 +206,7 @@ impl NodeInMemory { let op_storage = self.op_storage.clone(); let conn_manager = self.conn_manager.clone(); - let event_listener = self - .event_listener - .as_ref() - .map(|listener| listener.trait_clone()); + let event_listener = self.event_listener.trait_clone(); GlobalExecutor::spawn(process_message( msg, diff --git a/crates/core/src/node/p2p_impl.rs b/crates/core/src/node/p2p_impl.rs index 4e44ae477..e3400dfd7 100644 --- a/crates/core/src/node/p2p_impl.rs +++ b/crates/core/src/node/p2p_impl.rs @@ -14,7 +14,7 @@ use libp2p::{ use super::{ client_event_handling, conn_manager::{p2p_protoc::P2pConnManager, EventLoopNotifications}, - join_ring_request, PeerKey, + join_ring_request, EventLogRegister, PeerKey, }; use crate::{ client_events::combinator::ClientEventsCombinator, @@ -35,7 +35,6 @@ pub(super) struct NodeP2P { pub(crate) op_manager: Arc, notification_channel: EventLoopNotifications, pub(super) conn_manager: P2pConnManager, - // event_listener: Option>, is_gateway: bool, executor_listener: ExecutorToEventLoopChannel, cli_response_sender: ClientResponsesSender, @@ -75,8 +74,9 @@ impl NodeP2P { .await } - pub(crate) async fn build( + pub(crate) async fn build( builder: NodeBuilder, + event_listener: EL, ch_builder: CH::Builder, ) -> Result where @@ -85,7 +85,8 @@ impl NodeP2P { let peer_key = PeerKey::from(builder.local_key.public()); let gateways = builder.get_gateways()?; - let ring = Ring::new(&builder, &gateways)?; + let event_listener: Box = Box::new(event_listener); + let ring = Ring::new::(&builder, &gateways)?; let (notification_channel, notification_tx) = EventLoopNotifications::channel(); let (ch_outbound, ch_inbound) = contract::contract_handler_channel(); let (client_responses, cli_response_sender) = contract::ClientResponses::channel(); @@ -97,7 +98,7 @@ impl NodeP2P { let conn_manager = { let transport = Self::config_transport(&builder.local_key)?; - P2pConnManager::build(transport, &builder, op_storage.clone())? + P2pConnManager::build(transport, &builder, op_storage.clone(), &*event_listener)? }; GlobalExecutor::spawn(contract::contract_handling(contract_handler)); @@ -152,7 +153,7 @@ mod test { client_events::test::MemoryEventsGen, config::GlobalExecutor, contract::MemoryContractHandler, - node::{tests::get_free_port, InitPeerNode}, + node::{event_log, tests::get_free_port, InitPeerNode}, ring::Location, }; @@ -208,7 +209,14 @@ mod test { .with_ip(Ipv4Addr::LOCALHOST) .with_port(peer1_port) .with_key(peer1_key); - let mut peer1 = Box::new(NodeP2P::build::(config, ()).await?); + let mut peer1 = Box::new( + NodeP2P::build::( + config, + event_log::TestEventListener::new(), + (), + ) + .await?, + ); peer1.conn_manager.listen_on()?; ping_ev_loop(&mut peer1).await.unwrap(); Ok::<_, anyhow::Error>(()) @@ -219,9 +227,13 @@ mod test { let user_events = MemoryEventsGen::new(receiver2, PeerKey::from(peer2_id)); let mut config = NodeBuilder::new([Box::new(user_events)]); config.add_gateway(peer1_config.clone()); - let mut peer2 = NodeP2P::build::(config, ()) - .await - .unwrap(); + let mut peer2 = NodeP2P::build::( + config, + event_log::TestEventListener::new(), + (), + ) + .await + .unwrap(); // wait a bit to make sure the first peer is up and listening tokio::time::sleep(Duration::from_millis(10)).await; peer2 diff --git a/crates/core/src/node/tests.rs b/crates/core/src/node/tests.rs index e4eba6473..88b9d1d68 100644 --- a/crates/core/src/node/tests.rs +++ b/crates/core/src/node/tests.rs @@ -205,9 +205,9 @@ impl SimNetwork { self.event_listener .add_node(label.clone(), PeerKey::from(id)); - let node = NodeInMemory::build::( + let node = NodeInMemory::build::( config, - Some(Box::new(self.event_listener.clone())), + self.event_listener.clone(), (), ) .await diff --git a/crates/core/src/operations/join_ring.rs b/crates/core/src/operations/join_ring.rs index 876549dec..3a88cb3ed 100644 --- a/crates/core/src/operations/join_ring.rs +++ b/crates/core/src/operations/join_ring.rs @@ -126,6 +126,7 @@ impl Operation for JoinRingOp { ); let new_location = Location::random(); + // FIXME: don't try to forward to peers which have already been tried (add a rejected_by list) let accepted_by = if op_storage.ring.should_accept(&new_location) { tracing::debug!("Accepting connection from {}", req_peer,); HashSet::from_iter([this_node_loc]) @@ -951,6 +952,15 @@ mod messages { } } + /* + + Peer A ---> Peer B (forward) ----> Peer C + |----- (forward) ---------> Peer D + + + Peer A ---> Peer B (forward) ----> Peer C ----> Peer D + + */ #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub(crate) enum JoinRequest { StartReq { diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 60e20361e..27f91e33c 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -31,31 +31,32 @@ pub(crate) struct PutOp { impl PutOp { pub(super) fn outcome(&self) -> OpOutcome { - match &self.stats { - Some(PutStats { - contract_location, - payload_size, - // first_response_time: Some((response_start, Some(response_end))), - transfer_time: Some((transfer_start, Some(transfer_end))), - target: Some(target), - .. - }) => { - let payload_transfer_time = *transfer_end - *transfer_start; - // todo: check if this is correct - // in puts both times are equivalent since when the transfer is initialized - // it already contains the payload - let first_response_time = payload_transfer_time.clone(); - OpOutcome::ContractOpSuccess { - target_peer: target, - contract_location: *contract_location, - payload_size: *payload_size, - payload_transfer_time, - first_response_time, - } - } - Some(_) => OpOutcome::Incomplete, - None => OpOutcome::Irrelevant, - } + // todo: track in the future + // match &self.stats { + // Some(PutStats { + // contract_location, + // payload_size, + // // first_response_time: Some((response_start, Some(response_end))), + // transfer_time: Some((transfer_start, Some(transfer_end))), + // target: Some(target), + // .. + // }) => { + // let payload_transfer_time: Duration = *transfer_end - *transfer_start; + // // in puts both times are equivalent since when the transfer is initialized + // // it already contains the payload + // let first_response_time = payload_transfer_time; + // OpOutcome::ContractOpSuccess { + // target_peer: target, + // contract_location: *contract_location, + // payload_size: *payload_size, + // payload_transfer_time, + // first_response_time, + // } + // } + // Some(_) => OpOutcome::Incomplete, + // None => OpOutcome::Irrelevant, + // } + OpOutcome::Irrelevant } pub(super) fn finalized(&self) -> bool { @@ -85,8 +86,8 @@ impl PutOp { } struct PutStats { - contract_location: Location, - payload_size: usize, + // contract_location: Location, + // payload_size: usize, // /// (start, end) // first_response_time: Option<(Instant, Option)>, /// (start, end) @@ -596,7 +597,7 @@ pub(crate) fn start_op( ); let id = Transaction::new(::tx_type_id(), peer); - let payload_size = contract.data().len(); + // let payload_size = contract.data().len(); let state = Some(PutState::PrepareRequest { contract, value, @@ -607,8 +608,8 @@ pub(crate) fn start_op( id, state, stats: Some(PutStats { - contract_location, - payload_size, + // contract_location, + // payload_size, target: None, // first_response_time: None, transfer_time: None, diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index 6a655d341..3c670557a 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -19,6 +19,7 @@ use std::{ atomic::{AtomicU64, AtomicUsize, Ordering::SeqCst}, Arc, }, + time::Duration, }; use anyhow::bail; @@ -28,7 +29,8 @@ use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use crate::{ - node::{self, NodeBuilder, PeerKey}, + config::GlobalExecutor, + node::{self, EventLogRegister, EventRegister, NodeBuilder, PeerKey}, router::Router, }; @@ -114,7 +116,7 @@ impl Ring { /// connection of a peer in the network). const MAX_HOPS_TO_LIVE: usize = 10; - pub fn new( + pub fn new( config: &NodeBuilder, gateways: &[PeerKeyLocation], ) -> Result { @@ -147,14 +149,15 @@ impl Ring { Self::MAX_CONNECTIONS }; - let router = Router::new(&[]); + let router = Arc::new(RwLock::new(Router::new(&[]))); + GlobalExecutor::spawn(Self::refresh_router::(router.clone())); let ring = Ring { rnd_if_htl_above, max_hops_to_live, max_connections, min_connections, - router: Arc::new(RwLock::new(router)), + router, connections_by_location: Arc::new(RwLock::new(BTreeMap::new())), location_for_peer: Arc::new(RwLock::new(BTreeMap::new())), cached_contracts: DashSet::new(), @@ -180,6 +183,21 @@ impl Ring { Ok(ring) } + async fn refresh_router(router: Arc>) { + let mut interval = tokio::time::interval(Duration::from_secs(60 * 5)); + interval.tick().await; + loop { + interval.tick().await; + let history = if std::any::type_name::() == std::any::type_name::() { + EventRegister::get_router_events(10_000).await.unwrap() + } else { + vec![] + }; + let router_ref = &mut *router.write(); + *router_ref = Router::new(&history); + } + } + #[inline(always)] /// Return if a location is within appropiate caching distance. pub fn within_caching_distance(&self, _loc: &Location) -> bool { @@ -570,7 +588,7 @@ mod test { let (_, receiver) = channel((0, peer_key)); let user_events = MemoryEventsGen::new(receiver, peer_key); let config = NodeBuilder::new([Box::new(user_events)]); - let ring = Ring::new(&config, &[]).unwrap(); + let ring = Ring::new::<1, node::TestEventListener>(&config, &[]).unwrap(); fn build_pk(loc: Location) -> PeerKeyLocation { PeerKeyLocation { diff --git a/crates/core/src/router.rs b/crates/core/src/router.rs index bfa079bc7..ce3624d44 100644 --- a/crates/core/src/router.rs +++ b/crates/core/src/router.rs @@ -4,10 +4,11 @@ mod util; use crate::ring::{Location, PeerKeyLocation}; use isotonic_estimator::{EstimatorType, IsotonicEstimator, IsotonicEvent}; -use serde::Serialize; -use std::{fmt, time::Duration}; +use serde::{Deserialize, Serialize}; +use std::time::Duration; use util::{Mean, TransferSpeed}; +// Important: Need to periodically rebuild the Router using `history` for better predictions. #[derive(Debug, Clone, Serialize)] pub(crate) struct Router { response_start_time_estimator: IsotonicEstimator, @@ -193,23 +194,23 @@ impl Router { let time_to_response_start_estimate = self .response_start_time_estimator .estimate_retrieval_time(peer, contract_location) - .map_err(|e| { - RoutingError::EstimationError(format!( - "Response Start Time Estimation failed: {}", - e - )) + .map_err(|source| RoutingError::EstimationError { + estimation: "start time", + source, })?; let failure_estimate = self .failure_estimator .estimate_retrieval_time(peer, contract_location) - .map_err(|e| { - RoutingError::EstimationError(format!("Failure Estimation failed: {}", e)) + .map_err(|source| RoutingError::EstimationError { + estimation: "failure", + source, })?; let transfer_rate_estimate = self .transfer_rate_estimator .estimate_retrieval_time(peer, contract_location) - .map_err(|e| { - RoutingError::EstimationError(format!("Transfer Rate Estimation failed: {}", e)) + .map_err(|source| RoutingError::EstimationError { + estimation: "transfer rate", + source, })?; // This is a fairly naive approach, assuming that the cost of a failure is a multiple @@ -236,19 +237,16 @@ impl Router { } } -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] enum RoutingError { + #[error("Insufficient data provided")] InsufficientDataError, - EstimationError(String), -} - -impl fmt::Display for RoutingError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - RoutingError::InsufficientDataError => write!(f, "Insufficient data provided"), - RoutingError::EstimationError(err_msg) => write!(f, "Estimation error: {}", err_msg), - } - } + #[error("failed {estimation} estimation: {source}")] + EstimationError { + estimation: &'static str, + #[source] + source: isotonic_estimator::EstimationError, + }, } #[derive(Debug, Clone, Copy, Serialize)] @@ -259,14 +257,14 @@ struct RoutingPrediction { expected_total_time: f64, } -#[derive(Debug, Clone, Copy, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct RouteEvent { pub peer: PeerKeyLocation, pub contract_location: Location, pub outcome: RouteOutcome, } -#[derive(Debug, Clone, Copy, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum RouteOutcome { Success { time_to_response_start: Duration, diff --git a/crates/core/src/router/isotonic_estimator.rs b/crates/core/src/router/isotonic_estimator.rs index 2ba5d5dd8..d987917e1 100644 --- a/crates/core/src/router/isotonic_estimator.rs +++ b/crates/core/src/router/isotonic_estimator.rs @@ -1,7 +1,7 @@ use crate::ring::{Distance, Location, PeerKeyLocation}; use pav_regression::pav::{IsotonicRegression, Point}; use serde::Serialize; -use std::{collections::HashMap, fmt}; +use std::collections::HashMap; const MIN_POINTS_FOR_REGRESSION: usize = 5; @@ -155,17 +155,10 @@ pub(super) enum EstimatorType { Negative, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq, Eq, thiserror::Error)] pub(super) enum EstimationError { - InsufficientData, // Error indicating that there is not enough data for estimation -} - -impl fmt::Display for EstimationError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - EstimationError::InsufficientData => write!(f, "Insufficient data for estimation"), - } - } + #[error("Insufficient data for estimation")] + InsufficientData, } /// A routing event is a single request to a peer for a contract, and some value indicating diff --git a/crates/core/src/router/util.rs b/crates/core/src/router/util.rs index 0f78072d0..7aeca20a6 100644 --- a/crates/core/src/router/util.rs +++ b/crates/core/src/router/util.rs @@ -2,7 +2,7 @@ use std::time::Duration; use serde::Serialize; -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Copy, Serialize)] pub(super) struct Mean { sum: f64, count: u64, From b82567738c4a59a2b5caa6275e0e72e50f5055c3 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Thu, 12 Oct 2023 16:42:29 +0200 Subject: [PATCH 3/5] Use router to route requests. --- crates/core/src/operations/get.rs | 12 +++++--- crates/core/src/operations/join_ring.rs | 14 +++------- crates/core/src/operations/put.rs | 6 ++-- crates/core/src/operations/subscribe.rs | 12 +++++--- crates/core/src/ring.rs | 37 +++++++++---------------- crates/core/src/router.rs | 4 ++- 6 files changed, 39 insertions(+), 46 deletions(-) diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index e8227e5d0..56d03bc11 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -268,8 +268,12 @@ impl Operation for GetOp { } let new_htl = htl - 1; - let new_target = - op_storage.ring.closest_caching(&key, 1, &[sender.peer])[0]; + let Some(new_target) = + op_storage.ring.closest_caching(&key, &[sender.peer]) + else { + tracing::warn!("no peer found while trying getting contract {key}"); + return Err(OpError::RingError(RingError::NoCachingPeers(key))); + }; continue_seeking( conn_manager, @@ -373,7 +377,7 @@ impl Operation for GetOp { skip_list.push(target.peer); if let Some(target) = op_storage .ring - .closest_caching(&key, 1, skip_list.as_slice()) + .closest_caching(&key, skip_list.as_slice()) .into_iter() .next() { @@ -662,7 +666,7 @@ pub(crate) async fn request_get( ( op_storage .ring - .closest_caching(key, 1, &[]) + .closest_caching(key, &[]) .into_iter() .next() .ok_or(RingError::EmptyRing)?, diff --git a/crates/core/src/operations/join_ring.rs b/crates/core/src/operations/join_ring.rs index 3a88cb3ed..32415cade 100644 --- a/crates/core/src/operations/join_ring.rs +++ b/crates/core/src/operations/join_ring.rs @@ -795,14 +795,8 @@ where "Selecting close peer to forward request (requester: {})", req_peer.peer ); - ring.routing( - &new_peer_loc.location.unwrap(), - Some(&req_peer.peer), - 1, - &[], - ) - .pop() - .filter(|&pkl| pkl.peer != new_peer_loc.peer) + ring.routing(&new_peer_loc.location.unwrap(), Some(&req_peer.peer), &[]) + .and_then(|pkl| (pkl.peer != new_peer_loc.peer).then_some(pkl)) }; if let Some(forward_to) = forward_to { @@ -953,10 +947,10 @@ mod messages { } /* - + Peer A ---> Peer B (forward) ----> Peer C |----- (forward) ---------> Peer D - + Peer A ---> Peer B (forward) ----> Peer C ----> Peer D diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 27f91e33c..181a1ffc6 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -651,7 +651,7 @@ pub(crate) async fn request_put( // - and the value to put let target = op_storage .ring - .closest_caching(&key, 1, &[sender.peer]) + .closest_caching(&key, &[sender.peer]) .into_iter() .next() .ok_or(RingError::EmptyRing)?; @@ -737,9 +737,9 @@ async fn forward_changes( { let key = contract.key(); let contract_loc = Location::from(&key); - let forward_to = op_storage.ring.closest_caching(&key, 1, skip_list); + let forward_to = op_storage.ring.closest_caching(&key, skip_list); let own_loc = op_storage.ring.own_location().location.expect("infallible"); - for peer in forward_to { + if let Some(peer) = forward_to { let other_loc = peer.location.as_ref().expect("infallible"); let other_distance = contract_loc.distance(other_loc); let self_distance = contract_loc.distance(own_loc); diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 8bc768bfb..84b03563d 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -146,8 +146,12 @@ impl Operation for SubscribeOp { tracing::info!("Contract {} not found while processing info", key); tracing::info!("Trying to found the contract from another node"); - let new_target = - op_storage.ring.closest_caching(&key, 1, &[sender.peer])[0]; + let Some(new_target) = + op_storage.ring.closest_caching(&key, &[sender.peer]) + else { + tracing::warn!("no peer found while trying getting contract {key}"); + return Err(OpError::RingError(RingError::NoCachingPeers(key))); + }; let new_htl = htl + 1; if new_htl > MAX_RETRIES { @@ -220,7 +224,7 @@ impl Operation for SubscribeOp { skip_list.push(sender.peer); if let Some(target) = op_storage .ring - .closest_caching(&key, 1, skip_list.as_slice()) + .closest_caching(&key, skip_list.as_slice()) .into_iter() .next() { @@ -337,7 +341,7 @@ pub(crate) async fn request_subscribe( ( op_storage .ring - .closest_caching(key, 1, &[]) + .closest_caching(key, &[]) .into_iter() .next() .ok_or(RingError::EmptyRing)?, diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index 3c670557a..e33c2551c 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -303,30 +303,25 @@ impl Ring { Some(conn_by_dist[idx]) } - /// Return the closest peers to a contract location which are caching it, - /// excluding whichever peers in the skip list. + /// Return the most optimal peer caching a given contract. #[inline] pub fn closest_caching( &self, contract_key: &ContractKey, - n: usize, skip_list: &[PeerKey], - ) -> Vec { - // Right now we return just the closest known peers to that location. - // In the future this may change to the ones closest which are actually already caching it. - self.routing(&Location::from(contract_key), None, n, skip_list) + ) -> Option { + self.routing(&Location::from(contract_key), None, skip_list) } - /// Find the closest number of peers to a given location. Result is returned sorted by proximity. + /// Route an op to the most optimal target. pub fn routing( &self, target: &Location, requesting: Option<&PeerKey>, - n: usize, skip_list: &[PeerKey], - ) -> Vec { + ) -> Option { let connections = self.connections_by_location.read(); - let mut conn_by_dist: Vec<_> = connections + let peers = connections .iter() .filter(|(_, pkloc)| { if let Some(requester) = requesting { @@ -336,11 +331,9 @@ impl Ring { } !skip_list.contains(&pkloc.peer) }) - .map(|(loc, peer)| (loc.distance(target), (loc, peer))) - .collect(); - conn_by_dist.sort_by_key(|&(dist, _)| dist); - let iter = conn_by_dist.into_iter().map(|(_, v)| *v.1).take(n); - iter.collect() + .map(|(_, peer)| peer); + let router = &*self.router.read(); + router.select_peer(peers, target).cloned() } pub fn routing_finished(&self, event: crate::router::RouteEvent) { @@ -606,32 +599,28 @@ mod test { assert_eq!( Location(0.0), - ring.routing(&Location(0.9), None, 1, &[]) - .first() + ring.routing(&Location(0.9), None, &[]) .unwrap() .location .unwrap() ); assert_eq!( Location(0.0), - ring.routing(&Location(0.1), None, 1, &[]) - .first() + ring.routing(&Location(0.1), None, &[]) .unwrap() .location .unwrap() ); assert_eq!( Location(0.5), - ring.routing(&Location(0.41), None, 1, &[]) - .first() + ring.routing(&Location(0.41), None, &[]) .unwrap() .location .unwrap() ); assert_eq!( Location(0.3), - ring.routing(&Location(0.39), None, 1, &[]) - .first() + ring.routing(&Location(0.39), None, &[]) .unwrap() .location .unwrap() diff --git a/crates/core/src/router.rs b/crates/core/src/router.rs index ce3624d44..1662169dc 100644 --- a/crates/core/src/router.rs +++ b/crates/core/src/router.rs @@ -8,7 +8,9 @@ use serde::{Deserialize, Serialize}; use std::time::Duration; use util::{Mean, TransferSpeed}; -// Important: Need to periodically rebuild the Router using `history` for better predictions. +/// # Usage +/// Important when using this type: +/// Need to periodically rebuild the Router using `history` for better predictions. #[derive(Debug, Clone, Serialize)] pub(crate) struct Router { response_start_time_estimator: IsotonicEstimator, From e8fa85dd4f01a56e7d2ae6b6405a9e75809e0a68 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Thu, 12 Oct 2023 17:25:23 +0200 Subject: [PATCH 4/5] Truncate record when maxed --- crates/core/src/node/event_log.rs | 89 ++++++++++++++++++++++++++++--- 1 file changed, 83 insertions(+), 6 deletions(-) diff --git a/crates/core/src/node/event_log.rs b/crates/core/src/node/event_log.rs index 479927890..47f08d930 100644 --- a/crates/core/src/node/event_log.rs +++ b/crates/core/src/node/event_log.rs @@ -1,3 +1,5 @@ +use std::{io, path::Path}; + use freenet_stdlib::prelude::*; use futures::{future::BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; @@ -140,32 +142,107 @@ impl EventRegister { } async fn record_logs(mut log_recv: mpsc::Receiver) { + const MAX_LOG_RECORDS: usize = 100_000; + + async fn num_lines(path: &Path) -> io::Result { + use tokio::fs::File; + use tokio::io::{AsyncBufReadExt, BufReader}; + + let file = File::open(path).await.expect("Failed to open log file"); + let reader = BufReader::new(file); + let mut num_lines = 0; + let mut lines = reader.lines(); + while lines.next_line().await?.is_some() { + num_lines += 1; + } + Ok(num_lines) + } + + async fn truncate_lines( + file: &mut tokio::fs::File, + lines_to_keep: usize, + ) -> Result<(), Box> { + use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt}; + + file.seek(io::SeekFrom::Start(0)).await?; + let file_metadata = file.metadata().await?; + let file_size = file_metadata.len(); + let mut reader = tokio::io::BufReader::new(file); + + let mut buffer = Vec::with_capacity(file_size as usize); + let mut lines_count = 0; + + let mut line = Vec::new(); + let mut discard_bytes = 0; + + while lines_count < lines_to_keep { + let bytes_read = reader.read_until(b'\n', &mut line).await?; + if bytes_read == 0 { + // EOF + break; + } + lines_count += 1; + discard_bytes += bytes_read; + line.clear(); + } + + // Copy the rest of the file to the buffer + while let Ok(bytes_read) = reader.read_buf(&mut buffer).await { + if bytes_read == 0 { + // EOF + break; + } + } + + // Seek back to the beginning and write the remaining content let file = reader.into_inner(); + let file = reader.into_inner(); + file.seek(io::SeekFrom::Start(0)).await?; + file.write_all(&buffer).await?; + + // Truncate the file to the new size + file.set_len(file_size - discard_bytes as u64).await?; + file.seek(io::SeekFrom::End(0)).await?; + Ok(()) + } + use tokio::io::AsyncWriteExt; let event_log_path = crate::config::Config::get_static_conf().event_log(); let mut event_log = match OpenOptions::new().write(true).open(&event_log_path).await { Ok(file) => file, Err(err) => { - tracing::error!("failed openning log file {:?} with: {err}", event_log_path); - panic!("failed openning log file"); // todo: propagate this to the main thread + tracing::error!("Failed openning log file {:?} with: {err}", event_log_path); + panic!("Failed openning log file"); // todo: propagate this to the main thread } }; let mut num_written = 0; let mut buf = vec![]; while let Some(log) = log_recv.recv().await { if let Err(err) = bincode::serialize_into(&mut buf, &log) { - tracing::error!("failed serializing log: {err}"); - panic!("failed serializing log"); + tracing::error!("Failed serializing log: {err}"); + panic!("Failed serializing log"); } buf.push(b'\n'); num_written += 1; if num_written == 100 { if let Err(err) = event_log.write_all(&buf).await { - tracing::error!("failed writting to event log: {err}"); - panic!("failed writting event log"); + tracing::error!("Failed writting to event log: {err}"); + panic!("Failed writting event log"); } num_written = 0; buf.clear(); } + + // Check the number of lines and truncate if needed + let num_lines = num_lines(event_log_path.as_path()) + .await + .expect("non IO error"); + if num_lines > MAX_LOG_RECORDS { + let truncate_to = num_lines - MAX_LOG_RECORDS + 900; // make some extra space removing 1000 old records + if let Err(err) = truncate_lines(&mut event_log, truncate_to).await { + tracing::error!("Failed truncating log file: {:?}", err); + panic!("Failed truncating log file"); + } + } } } From 4c76a8e71b44b2a2b6bac1bd8ed29b29501d43c9 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Fri, 13 Oct 2023 08:58:35 +0200 Subject: [PATCH 5/5] Don't load old records Don't block while writing to log file --- crates/core/src/config.rs | 4 +- crates/core/src/contract/executor.rs | 2 +- crates/core/src/contract/in_memory.rs | 2 +- crates/core/src/contract/storages/rocks_db.rs | 2 +- crates/core/src/contract/storages/sqlite.rs | 2 +- crates/core/src/node.rs | 2 +- crates/core/src/node/event_log.rs | 70 +++++++++++++++---- crates/fdev/src/commands.rs | 6 +- crates/fdev/src/local_node/state.rs | 2 +- 9 files changed, 68 insertions(+), 24 deletions(-) diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index f8b8155dd..8a36a3b1e 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -149,7 +149,7 @@ impl Config { pub fn set_op_mode(mode: OperationMode) { let local_mode = matches!(mode, OperationMode::Local); - Self::get_static_conf() + Self::conf() .local_mode .store(local_mode, std::sync::atomic::Ordering::SeqCst); } @@ -196,7 +196,7 @@ impl Config { } } - pub fn get_static_conf() -> &'static Config { + pub fn conf() -> &'static Config { CONFIG.get_or_init(|| match Config::load_conf() { Ok(config) => config, Err(err) => { diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index b6a86baf2..a708ca866 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -342,7 +342,7 @@ impl Executor { pub async fn from_config(config: NodeConfig) -> Result { const MAX_SIZE: i64 = 10 * 1024 * 1024; const MAX_MEM_CACHE: u32 = 10_000_000; - let static_conf = crate::config::Config::get_static_conf(); + let static_conf = crate::config::Config::conf(); let state_store = StateStore::new(Storage::new().await?, MAX_MEM_CACHE).unwrap(); diff --git a/crates/core/src/contract/in_memory.rs b/crates/core/src/contract/in_memory.rs index 58fd9a5db..f390c7625 100644 --- a/crates/core/src/contract/in_memory.rs +++ b/crates/core/src/contract/in_memory.rs @@ -46,7 +46,7 @@ where _kv_store: StateStore::new(kv_store, 10_000_000).unwrap(), _runtime: MockRuntime { contract_store: ContractStore::new( - Config::get_static_conf().contracts_dir(), + Config::conf().contracts_dir(), Self::MAX_MEM_CACHE, ) .unwrap(), diff --git a/crates/core/src/contract/storages/rocks_db.rs b/crates/core/src/contract/storages/rocks_db.rs index 44d66de43..90850728f 100644 --- a/crates/core/src/contract/storages/rocks_db.rs +++ b/crates/core/src/contract/storages/rocks_db.rs @@ -9,7 +9,7 @@ pub struct RocksDb(DB); impl RocksDb { #[cfg_attr(feature = "sqlite", allow(unused))] pub async fn new() -> Result { - let path = Config::get_static_conf().db_dir().join("freenet.db"); + let path = Config::conf().db_dir().join("freenet.db"); tracing::info!("loading contract store from {path:?}"); let mut opts = Options::default(); diff --git a/crates/core/src/contract/storages/sqlite.rs b/crates/core/src/contract/storages/sqlite.rs index ec8bfbce5..253181b5f 100644 --- a/crates/core/src/contract/storages/sqlite.rs +++ b/crates/core/src/contract/storages/sqlite.rs @@ -18,7 +18,7 @@ static POOL: Lazy = Lazy::new(|| { let opts = if cfg!(test) { SqliteConnectOptions::from_str("sqlite::memory:").unwrap() } else { - let conn_str = Config::get_static_conf().db_dir().join("freenet.db"); + let conn_str = Config::conf().db_dir().join("freenet.db"); tracing::info!("loading contract store from {conn_str:?}"); SqliteConnectOptions::new() .create_if_missing(true) diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index a690b41a6..aa71a80ec 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -115,7 +115,7 @@ pub struct NodeBuilder { impl NodeBuilder { pub fn new(clients: [BoxedClient; CLIENTS]) -> NodeBuilder { - let local_key = if let Some(key) = &Config::get_static_conf().local_peer_keypair { + let local_key = if let Some(key) = &Config::conf().local_peer_keypair { key.clone() } else { identity::Keypair::generate_ed25519() diff --git a/crates/core/src/node/event_log.rs b/crates/core/src/node/event_log.rs index 47f08d930..e7d6bcee7 100644 --- a/crates/core/src/node/event_log.rs +++ b/crates/core/src/node/event_log.rs @@ -1,5 +1,6 @@ -use std::{io, path::Path}; +use std::{io, path::Path, time::SystemTime}; +use chrono::{DateTime, Utc}; use freenet_stdlib::prelude::*; use futures::{future::BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; @@ -125,6 +126,7 @@ impl<'a> EventLog<'a> { #[derive(Serialize, Deserialize)] struct LogMessage { tx: Transaction, + datetime: DateTime, peer_id: PeerKey, kind: EventKind, } @@ -134,15 +136,20 @@ pub(crate) struct EventRegister { log_sender: mpsc::Sender, } +/// Records from a new session must have higher than this ts. +static NEW_RECORDS_TS: std::sync::OnceLock = std::sync::OnceLock::new(); + impl EventRegister { pub fn new() -> Self { let (log_sender, log_recv) = mpsc::channel(1000); + NEW_RECORDS_TS.set(SystemTime::now()).expect("non set"); GlobalExecutor::spawn(Self::record_logs(log_recv)); Self { log_sender } } async fn record_logs(mut log_recv: mpsc::Receiver) { const MAX_LOG_RECORDS: usize = 100_000; + const BATCH_SIZE: usize = 100; async fn num_lines(path: &Path) -> io::Result { use tokio::fs::File; @@ -206,7 +213,7 @@ impl EventRegister { } use tokio::io::AsyncWriteExt; - let event_log_path = crate::config::Config::get_static_conf().event_log(); + let event_log_path = crate::config::Config::conf().event_log(); let mut event_log = match OpenOptions::new().write(true).open(&event_log_path).await { Ok(file) => file, Err(err) => { @@ -215,21 +222,47 @@ impl EventRegister { } }; let mut num_written = 0; - let mut buf = vec![]; + let mut batch_buf = Vec::with_capacity(BATCH_SIZE * 1024); + let mut log_batch = Vec::with_capacity(BATCH_SIZE); while let Some(log) = log_recv.recv().await { - if let Err(err) = bincode::serialize_into(&mut buf, &log) { - tracing::error!("Failed serializing log: {err}"); - panic!("Failed serializing log"); + log_batch.push(log); + + if log_batch.len() >= BATCH_SIZE { + let moved_batch = std::mem::replace(&mut log_batch, Vec::with_capacity(BATCH_SIZE)); + let serialization_task = tokio::task::spawn_blocking(move || { + let mut batch_serialized_data = Vec::new(); + for log_item in &moved_batch { + if let Err(err) = + bincode::serialize_into(&mut batch_serialized_data, log_item) + { + // Handle the error appropriately + tracing::error!("Failed serializing log: {err}"); + return Err(err); + } + batch_serialized_data.push(b'\n'); + } + Ok(batch_serialized_data) + }); + + match serialization_task.await { + Ok(Ok(mut serialized_data)) => { + batch_buf.append(&mut serialized_data); + num_written += log_batch.len(); + log_batch.clear(); // Clear the batch for new data + } + _ => { + panic!("Failed serializing log"); + } + } } - buf.push(b'\n'); - num_written += 1; - if num_written == 100 { - if let Err(err) = event_log.write_all(&buf).await { + + if num_written >= BATCH_SIZE { + if let Err(err) = event_log.write_all(&batch_buf).await { tracing::error!("Failed writting to event log: {err}"); panic!("Failed writting event log"); } num_written = 0; - buf.clear(); + batch_buf.clear(); } // Check the number of lines and truncate if needed @@ -252,7 +285,7 @@ impl EventRegister { const MAX_EVENT_HISTORY: usize = 10_000; let event_num = max_event_number.min(MAX_EVENT_HISTORY); - let event_log_path = crate::config::Config::get_static_conf().event_log(); + let event_log_path = crate::config::Config::conf().event_log(); let mut event_log = OpenOptions::new().read(true).open(event_log_path).await?; let mut buf = [0; BUF_SIZE]; @@ -260,6 +293,12 @@ impl EventRegister { let mut partial_record = vec![]; let mut record_start = 0; + let new_records_ts = NEW_RECORDS_TS + .get() + .expect("set on initialization") + .duration_since(std::time::UNIX_EPOCH) + .expect("should be older than unix epoch") + .as_secs() as i64; while records.len() < event_num { let bytes_read = event_log.read(&mut buf).await?; if bytes_read == 0 { @@ -281,7 +320,10 @@ impl EventRegister { rec }; if let EventKind::Route(outcome) = deser_record.kind { - records.push(outcome); + let record_ts = deser_record.datetime.timestamp(); + if record_ts > new_records_ts { + records.push(outcome); + } } } if records.len() == event_num { @@ -299,6 +341,7 @@ impl EventRegister { impl EventLogRegister for EventRegister { fn event_received<'a>(&'a mut self, log: EventLog) -> BoxFuture<'a, Result<(), DynError>> { let log_msg = LogMessage { + datetime: Utc::now(), tx: *log.tx, kind: log.kind, peer_id: *log.peer_id, @@ -502,6 +545,7 @@ pub(super) mod test_utils { let log_id = ListenerLogId(LOG_ID.fetch_add(1, SeqCst)); let EventLog { peer_id, kind, .. } = log; let msg_log = LogMessage { + datetime: Utc::now(), tx: *log.tx, peer_id: *peer_id, kind, diff --git a/crates/fdev/src/commands.rs b/crates/fdev/src/commands.rs index 957b24e51..2043ca7b9 100644 --- a/crates/fdev/src/commands.rs +++ b/crates/fdev/src/commands.rs @@ -156,13 +156,13 @@ async fn execute_command( ) -> Result<(), DynError> { let contracts_data_path = other .contract_data_dir - .unwrap_or_else(|| Config::get_static_conf().contracts_dir()); + .unwrap_or_else(|| Config::conf().contracts_dir()); let delegates_data_path = other .delegate_data_dir - .unwrap_or_else(|| Config::get_static_conf().delegates_dir()); + .unwrap_or_else(|| Config::conf().delegates_dir()); let secrets_data_path = other .secret_data_dir - .unwrap_or_else(|| Config::get_static_conf().secrets_dir()); + .unwrap_or_else(|| Config::conf().secrets_dir()); let contract_store = ContractStore::new(contracts_data_path, DEFAULT_MAX_CONTRACT_SIZE)?; let delegate_store = DelegateStore::new(delegates_data_path, DEFAULT_MAX_DELEGATE_SIZE)?; diff --git a/crates/fdev/src/local_node/state.rs b/crates/fdev/src/local_node/state.rs index 4501ccc0f..0ded9ce22 100644 --- a/crates/fdev/src/local_node/state.rs +++ b/crates/fdev/src/local_node/state.rs @@ -21,7 +21,7 @@ impl AppState { const MAX_MEM_CACHE: u32 = 10_000_000; pub async fn new(config: &LocalNodeCliConfig) -> Result { - let contract_dir = Config::get_static_conf().contracts_dir(); + let contract_dir = Config::conf().contracts_dir(); let contract_store = ContractStore::new(contract_dir, config.max_contract_size)?; let state_store = StateStore::new(Storage::new().await?, Self::MAX_MEM_CACHE).unwrap(); Ok(AppState {