diff --git a/crates/core/src/node/network_bridge.rs b/crates/core/src/node/network_bridge.rs index 724b1d918..ec0465b93 100644 --- a/crates/core/src/node/network_bridge.rs +++ b/crates/core/src/node/network_bridge.rs @@ -14,10 +14,6 @@ mod handshake; pub(crate) mod in_memory; pub(crate) mod p2p_protoc; -// TODO: use this constants when we do real net i/o -// const PING_EVERY: Duration = Duration::from_secs(30); -// const DROP_CONN_AFTER: Duration = Duration::from_secs(30 * 10); - pub(crate) type ConnResult = std::result::Result; /// Allows handling of connections to the network as well as sending messages diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index e7aa16acf..8d38a67a4 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -1,5 +1,5 @@ -use freenet_stdlib::client_api::{ErrorKind, HostResponse}; // TODO: complete update logic in the network +use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use freenet_stdlib::prelude::*; use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; @@ -28,24 +28,6 @@ impl UpdateOp { matches!(self.state, None | Some(UpdateState::Finished { .. })) } - // pub(super) fn record_transfer(&mut self) { - // if let Some(stats) = self.stats.as_mut() { - // match stats.step { - // RecordingStats::Uninitialized => { - // stats.transfer_time = Some((Instant::now(), None)); - // stats.step = RecordingStats::InitUpdate; - // } - // RecordingStats::InitUpdate => { - // if let Some((_, e)) = stats.transfer_time.as_mut() { - // *e = Some(Instant::now()); - // } - // stats.step = RecordingStats::Completed; - // } - // RecordingStats::Completed => {} - // } - // } - // } - pub(super) fn to_host_result(&self) -> HostResult { if let Some(UpdateState::Finished { key, summary }) = &self.state { Ok(HostResponse::ContractResponse( @@ -65,17 +47,8 @@ impl UpdateOp { struct UpdateStats { target: Option, - // step: RecordingStats, } -// /// While timing, at what particular step we are now. -// #[derive(Clone, Copy, Default)] -// enum RecordingStats { -// #[default] -// Uninitialized, -// Completed, -// } - pub(crate) struct UpdateResult {} impl TryFrom for UpdateResult { diff --git a/crates/core/src/topology.rs b/crates/core/src/topology.rs index c9316c55a..9968d323f 100644 --- a/crates/core/src/topology.rs +++ b/crates/core/src/topology.rs @@ -348,12 +348,10 @@ impl TopologyManager { let decrease_usage_if_above: RateProportion = RateProportion::new(MAXIMUM_DESIRED_RESOURCE_USAGE_PROPORTION); - let usage_rates = self.calculate_usage_proportion(at_time); - - let (resource_type, usage_proportion) = usage_rates.max_usage_rate; + let (resource_type, usage_proportion) = self.calculate_usage_proportion(at_time); // Detailed resource usage information - debug!("Usage proportions: {:?}", usage_rates); + debug!(?usage_proportion, "Resource usage information"); let adjustment: anyhow::Result = if neighbor_locations.len() > self.limits.max_connections { @@ -405,7 +403,7 @@ impl TopologyManager { adjustment.unwrap_or(TopologyAdjustment::NoChange) } - fn calculate_usage_proportion(&mut self, at_time: Instant) -> UsageRates { + fn calculate_usage_proportion(&mut self, at_time: Instant) -> (ResourceType, RateProportion) { let mut usage_rate_per_type = HashMap::new(); for resource_type in ResourceType::all() { let usage = self.extrapolated_usage(&resource_type, at_time); @@ -418,11 +416,7 @@ impl TopologyManager { .max_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap_or(Ordering::Equal)) .unwrap(); - UsageRates { - // TODO: Is there a way to avoid this clone()? - usage_rate_per_type: usage_rate_per_type.clone(), - max_usage_rate: (*max_usage_rate.0, *max_usage_rate.1), - } + (*max_usage_rate.0, *max_usage_rate.1) } /// modify the current connection acquisition strategy @@ -970,10 +964,3 @@ impl Limits { } } } - -#[derive(Debug, Clone)] -struct UsageRates { - #[allow(unused)] - usage_rate_per_type: HashMap, - max_usage_rate: (ResourceType, RateProportion), -} diff --git a/crates/core/src/transport.rs b/crates/core/src/transport.rs index d2638bf7b..06fa33ef6 100644 --- a/crates/core/src/transport.rs +++ b/crates/core/src/transport.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] // TODO: Remove before integration //! Freenet Transport protocol implementation. //! //! Please see `docs/architecture/transport.md` for more information. @@ -22,8 +21,6 @@ type MessagePayload = Vec; type PacketId = u32; -use self::peer_connection::StreamId; - pub use self::crypto::{TransportKeypair, TransportPublicKey}; #[cfg(test)] pub(crate) use self::{ @@ -47,20 +44,14 @@ pub(crate) enum TransportError { ConnectionClosed(SocketAddr), #[error("failed while establishing connection, reason: {cause}")] ConnectionEstablishmentFailure { cause: Cow<'static, str> }, - #[error("incomplete inbound stream: {0}")] - IncompleteInboundStream(StreamId), #[error(transparent)] IO(#[from] std::io::Error), #[error(transparent)] Other(#[from] anyhow::Error), - #[error("{0}")] - PrivateKeyDecryptionError(aes_gcm::aead::Error), #[error(transparent)] PubKeyDecryptionError(#[from] rsa::errors::Error), #[error(transparent)] Serialization(#[from] bincode::Error), - #[error("received unexpected message from remote: {0}")] - UnexpectedMessage(Cow<'static, str>), } /// Make connection handler more testable diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index 458d6cecc..46b375eda 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -38,24 +38,7 @@ const MAX_INTERVAL: Duration = Duration::from_millis(5000); // Maximum interval const DEFAULT_BW_TRACKER_WINDOW_SIZE: Duration = Duration::from_secs(10); const BANDWITH_LIMIT: usize = 1024 * 1024 * 10; // 10 MB/s -type ConnectionHandlerMessage = (SocketAddr, Vec); pub type SerializedMessage = Vec; -type PeerChannel = ( - mpsc::Sender, - mpsc::Receiver, -); - -struct OutboundMessage { - remote_addr: SocketAddr, - msg: SerializedMessage, - recv: mpsc::Receiver, -} - -struct GatewayMessage { - remote_addr: SocketAddr, - packet: PacketData, - resp_tx: oneshot::Sender, -} pub(crate) async fn create_connection_handler( keypair: TransportKeypair, @@ -191,16 +174,6 @@ impl OutboundConnectionHandler { } } -pub enum Message { - Short(Vec), - Streamed(Vec, mpsc::Receiver), -} - -pub struct StreamFragment { - pub fragment_number: u32, - pub fragment: Vec, -} - /// Handles UDP transport internally. struct UdpPacketsListener { socket_listener: Arc, @@ -505,8 +478,6 @@ impl UdpPacketsListener { let inbound_conn = InboundRemoteConnection { inbound_packet_sender: inbound_packet_tx, - inbound_intro_packet: None, - inbound_checked_times: 0, }; tracing::debug!("returning connection at gw"); @@ -701,8 +672,6 @@ impl UdpPacketsListener { }, InboundRemoteConnection { inbound_packet_sender: inbound_sender, - inbound_intro_packet: None, - inbound_checked_times: 0, }, )); } @@ -771,8 +740,6 @@ impl UdpPacketsListener { }, InboundRemoteConnection { inbound_packet_sender: inbound_sender, - inbound_intro_packet: Some(intro_packet.clone()), - inbound_checked_times: 0, }, )); } @@ -832,28 +799,6 @@ pub(crate) enum ConnectionEvent { struct InboundRemoteConnection { inbound_packet_sender: mpsc::Sender>, - inbound_intro_packet: Option>, - inbound_checked_times: usize, -} - -impl InboundRemoteConnection { - fn check_inbound_packet(&mut self, packet: &PacketData) -> bool { - let mut inbound = false; - if let Some(inbound_intro_packet) = self.inbound_intro_packet.as_ref() { - if packet.is_intro_packet(inbound_intro_packet) { - inbound = true; - } - } - if self.inbound_checked_times >= UdpPacketsListener::::NAT_TRAVERSAL_MAX_ATTEMPTS - { - // no point in checking more than the max attemps since they won't be sending - // the intro packet more than this amount of times - self.inbound_intro_packet = None; - } else { - self.inbound_checked_times += 1; - } - inbound - } } #[cfg(test)] @@ -1455,7 +1400,7 @@ mod test { async fn simulate_send_short_message() -> anyhow::Result<()> { // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::TRACE), None); #[derive(Clone, Copy)] - struct TestData(&'static str, usize); + struct TestData(&'static str); impl TestFixture for TestData { type Message = String; @@ -1477,7 +1422,7 @@ mod test { peers: 10, ..Default::default() }, - Vec::from_iter((0..10).map(|i| TestData("foo", i))), + Vec::from_iter((0..10).map(|_| TestData("foo"))), ) .await } diff --git a/crates/core/src/transport/crypto.rs b/crates/core/src/transport/crypto.rs index 7742f237d..e39263d41 100644 --- a/crates/core/src/transport/crypto.rs +++ b/crates/core/src/transport/crypto.rs @@ -38,6 +38,7 @@ impl TransportKeypair { &self.public } + #[cfg(test)] pub(crate) fn secret(&self) -> &TransportSecretKey { &self.secret } @@ -93,6 +94,7 @@ impl TransportSecretKey { self.0.decrypt(Pkcs1v15Encrypt, data) } + #[cfg(test)] pub fn to_pkcs8_pem(&self) -> Result, pkcs8::Error> { use pkcs8::EncodePrivateKey; diff --git a/crates/core/src/transport/packet_data.rs b/crates/core/src/transport/packet_data.rs index 3ac7d26c0..d14f33853 100644 --- a/crates/core/src/transport/packet_data.rs +++ b/crates/core/src/transport/packet_data.rs @@ -73,10 +73,6 @@ impl Encryption for SymmetricAES {} impl Encryption for AssymetricRSA {} impl Encryption for UnknownEncryption {} -pub(super) const fn packet_size() -> usize { - DATA_SIZE + NONCE_SIZE + TAG_SIZE -} - fn internal_sym_decryption( data: &[u8], size: usize, @@ -136,10 +132,6 @@ impl PacketData { data_type: PhantomData, } } - - pub fn preparef_send(self) -> Arc<[u8]> { - self.data[..self.size].into() - } } impl PacketData { diff --git a/crates/core/src/transport/peer_connection.rs b/crates/core/src/transport/peer_connection.rs index 4678affea..06eefcaf4 100644 --- a/crates/core/src/transport/peer_connection.rs +++ b/crates/core/src/transport/peer_connection.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::net::SocketAddr; -use std::pin::Pin; use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::Duration; @@ -8,7 +7,7 @@ use std::time::Duration; use crate::transport::packet_data::UnknownEncryption; use aes_gcm::Aes128Gcm; use futures::stream::FuturesUnordered; -use futures::{Future, StreamExt}; +use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::task::JoinHandle; @@ -67,7 +66,6 @@ impl std::fmt::Display for StreamId { } type InboundStreamResult = Result<(StreamId, SerializedMessage), StreamId>; -type InboundStreamFut = Pin + Send>>; /// The `PeerConnection` struct is responsible for managing the connection with a remote peer. /// It provides methods for sending and receiving messages to and from the remote peer. @@ -593,7 +591,7 @@ mod tests { while let Some((_, network_packet)) = receiver.recv().await { let decrypted = PacketData::<_, MAX_PACKET_SIZE>::from_buf(&network_packet) .try_decrypt_sym(&cipher) - .map_err(TransportError::PrivateKeyDecryptionError)?; + .map_err(|e| e.to_string())?; let SymmetricMessage { payload: SymmetricMessagePayload::StreamFragment { diff --git a/crates/core/src/transport/peer_connection/outbound_stream.rs b/crates/core/src/transport/peer_connection/outbound_stream.rs index 6e99f4288..8c4fa37ea 100644 --- a/crates/core/src/transport/peer_connection/outbound_stream.rs +++ b/crates/core/src/transport/peer_connection/outbound_stream.rs @@ -127,7 +127,7 @@ mod tests { while let Some((_, packet)) = outbound_receiver.recv().await { let decrypted_packet = PacketData::<_, MAX_PACKET_SIZE>::from_buf(packet.as_ref()) .try_decrypt_sym(&cipher) - .map_err(TransportError::PrivateKeyDecryptionError)?; + .map_err(|e| e.to_string())?; let deserialized = SymmetricMessage::deser(decrypted_packet.data())?; let SymmetricMessagePayload::StreamFragment { payload, .. } = deserialized.payload else { diff --git a/crates/core/src/transport/sent_packet_tracker.rs b/crates/core/src/transport/sent_packet_tracker.rs index ba887617e..734677a0b 100644 --- a/crates/core/src/transport/sent_packet_tracker.rs +++ b/crates/core/src/transport/sent_packet_tracker.rs @@ -75,15 +75,6 @@ impl SentPacketTracker { } impl SentPacketTracker { - /// Get an estimate of the proportion of outbound packets that were lost This is a value - /// between 0.0 and 1.0, where 0.0 means no packets are lost and 1.0 means all packets are - /// lost. This estimate will be biased towards 0.0 initially, and will converge to the - /// true value over time. It's accuracy will be approximately - /// `PACKET_LOSS_DECAY_FACTOR` (0.001). - pub(super) fn get_recent_packet_loss(&self) -> f64 { - self.packet_loss_proportion - } - pub(super) fn report_sent_packet(&mut self, packet_id: PacketId, payload: Arc<[u8]>) { self.pending_receipts.insert(packet_id, payload); self.resend_queue.push_back(ResendQueueEntry { diff --git a/crates/fdev/Cargo.toml b/crates/fdev/Cargo.toml index 4761d64b3..2cedf1029 100644 --- a/crates/fdev/Cargo.toml +++ b/crates/fdev/Cargo.toml @@ -2,7 +2,7 @@ name = "fdev" version = "0.0.7" edition = "2021" -rust-version = "1.71.1" +rust-version = "1.80" publish = true description = "Freenet development tool" license = "MIT OR Apache-2.0" diff --git a/crates/fdev/src/build.rs b/crates/fdev/src/build.rs index 130dcf232..ceafc7cf4 100644 --- a/crates/fdev/src/build.rs +++ b/crates/fdev/src/build.rs @@ -143,9 +143,8 @@ fn get_out_lib(work_dir: &Path, cli_config: &BuildToolConfig) -> anyhow::Result< "debug" }; let output_lib = env::var("CARGO_TARGET_DIR") - .map_err(|e| { + .inspect_err(|_| { println!("Missing environment variable `CARGO_TARGET_DIR"); - e })? .parse::()? .join(target) diff --git a/crates/fdev/src/testing/network.rs b/crates/fdev/src/testing/network.rs index ab4645d7f..6bfd6e397 100644 --- a/crates/fdev/src/testing/network.rs +++ b/crates/fdev/src/testing/network.rs @@ -26,14 +26,13 @@ use freenet::dev_tool::{ }; use futures::{ stream::{SplitSink, SplitStream}, - SinkExt, StreamExt, + FutureExt, SinkExt, StreamExt, }; use http::{Response, StatusCode}; use thiserror::Error; use tokio::{ process::Command, sync::{oneshot, Mutex}, - task::JoinHandle, }; use super::{Error, TestConfig}; @@ -324,45 +323,18 @@ async fn handle_socket(socket: WebSocket, supervisor: Arc) -> anyhow let (mut sender, mut receiver): (SplitSink, SplitStream) = socket.split(); - // Spawn a task for handling outgoing messages. - let mut sender_task: JoinHandle> = - tokio::spawn( - async move { handle_outgoing_messages(&cloned_supervisor, &mut sender).await }, - ); - - // Spawn a task for handling incoming messages. - let mut receiver_task: JoinHandle> = - tokio::spawn(async move { handle_incoming_messages(&supervisor, &mut receiver).await }); - - // Wait for either the sender or receiver task to complete and then clean up. + let mut sender_task = handle_outgoing_messages(&cloned_supervisor, &mut sender).boxed(); + let mut receiver_task = handle_incoming_messages(&supervisor, &mut receiver).boxed(); tokio::select! { event_s = &mut sender_task => { - match event_s { - Ok(_) => { - tracing::info!("Sender task finished"); - receiver_task.abort(); - Ok(()) - } - Err(e) => { - tracing::error!("Sender task failed: {}", e); - receiver_task.abort(); - Err(e.into()) - } - } + event_s + .inspect_err(|e| tracing::error!("Sender task failed: {e}")) + .inspect(|_| tracing::info!("Sender task finished")) } peer_r = &mut receiver_task => { - match peer_r { - Ok(_) => { - tracing::info!("Receiver task finished"); - sender_task.abort(); - Ok(()) - } - Err(e) => { - tracing::error!("Receiver task failed: {}", e); - sender_task.abort(); - Err(e.into()) - } - } + peer_r + .inspect_err(|e| tracing::error!("Receiver task failed: {e}")) + .inspect(|_| tracing::info!("Receiver task finished")) } } } @@ -373,7 +345,6 @@ async fn handle_outgoing_messages( ) -> anyhow::Result<()> { let mut event_rx = supervisor.user_ev_controller.lock().await.subscribe(); while let Ok((event, peer_id)) = event_rx.recv().await { - tracing::info!("Sending event {} to peer {}", event, peer_id); let serialized_msg: Vec = bincode::serialize(&(event, peer_id.clone())) .map_err(|e| anyhow!("Failed to serialize message: {}", e))?;