Skip to content

Commit

Permalink
chore: rustfmt.toml
Browse files Browse the repository at this point in the history
  • Loading branch information
kamuikatsurgi committed Nov 27, 2024
1 parent c9c44b1 commit 92af583
Show file tree
Hide file tree
Showing 369 changed files with 8,533 additions and 5,658 deletions.
2 changes: 1 addition & 1 deletion core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

- Update `Transport::dial` function signature with a `DialOpts` param and remove `Transport::dial_as_listener`:
- `DialOpts` struct contains `PortUse` and `Endpoint`,
- `PortUse` allows controlling port allocation of new connections (defaults to `PortUse::Reuse`) -
- `PortUse` allows controling port allocation of new connections (defaults to `PortUse::Reuse`) -
- Add `port_use` field to `ConnectedPoint`
- Set `endpoint` field in `DialOpts` to `Endpoint::Listener` to dial as a listener
- Remove `Transport::address_translation` and relocate functionality to `libp2p_swarm`
Expand Down
22 changes: 13 additions & 9 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,14 @@ pub enum ConnectedPoint {
/// connection as a dialer and one peer dial the other and upgrade the
/// connection _as a listener_ overriding its role.
role_override: Endpoint,
/// Whether the port for the outgoing connection was reused from a listener
/// or a new port was allocated. This is useful for address translation.
/// Whether the port for the outgoing connection was reused from a
/// listener or a new port was allocated. This is useful for
/// address translation.
///
/// The port use is implemented on a best-effort basis. It is not guaranteed
/// that [`PortUse::Reuse`] actually reused a port. A good example is the case
/// where there is no listener available to reuse a port from.
/// The port use is implemented on a best-effort basis. It is not
/// guaranteed that [`PortUse::Reuse`] actually reused a port. A
/// good example is the case where there is no listener
/// available to reuse a port from.
port_use: PortUse,
},
/// We received the node.
Expand Down Expand Up @@ -153,10 +155,11 @@ impl ConnectedPoint {

/// Returns the address of the remote stored in this struct.
///
/// For `Dialer`, this returns `address`. For `Listener`, this returns `send_back_addr`.
/// For `Dialer`, this returns `address`. For `Listener`, this returns
/// `send_back_addr`.
///
/// Note that the remote node might not be listening on this address and hence the address might
/// not be usable to establish new connections.
/// Note that the remote node might not be listening on this address and
/// hence the address might not be usable to establish new connections.
pub fn get_remote_address(&self) -> &Multiaddr {
match self {
ConnectedPoint::Dialer { address, .. } => address,
Expand All @@ -166,7 +169,8 @@ impl ConnectedPoint {

/// Modifies the address of the remote stored in this struct.
///
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies
/// `send_back_addr`.
pub fn set_remote_address(&mut self, new_address: Multiaddr) {
match self {
ConnectedPoint::Dialer { address, .. } => *address = new_address,
Expand Down
20 changes: 12 additions & 8 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::muxing::StreamMuxerEvent;
use crate::transport::DialOpts;
use crate::{
muxing::StreamMuxer,
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr,
use std::{
pin::Pin,
task::{Context, Poll},
};

use either::Either;
use futures::prelude::*;
use pin_project::pin_project;
use std::{pin::Pin, task::Context, task::Poll};

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent},
Multiaddr,
};

impl<A, B> StreamMuxer for future::Either<A, B>
where
Expand Down Expand Up @@ -88,7 +91,8 @@ where
}
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
/// Implements `Future` and dispatches all method calls to either `First` or
/// `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
Expand Down
7 changes: 4 additions & 3 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@
//! to a remote and can subdivide this connection into multiple substreams.
//! See the [`muxing`] module.
//! - The [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] traits
//! define how to upgrade each individual substream to use a protocol.
//! See the `upgrade` module.
//! define how to upgrade each individual substream to use a protocol. See the
//! `upgrade` module.

#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod proto {
#![allow(unreachable_pub)]
include!("generated/mod.rs");
pub use self::{
envelope_proto::*, peer_record_proto::mod_PeerRecord::*, peer_record_proto::PeerRecord,
envelope_proto::*,
peer_record_proto::{mod_PeerRecord::*, PeerRecord},
};
}

Expand Down
112 changes: 66 additions & 46 deletions core/src/muxing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,63 +20,76 @@

//! Muxing is the process of splitting a connection into multiple substreams.
//!
//! The main item of this module is the `StreamMuxer` trait. An implementation of `StreamMuxer`
//! has ownership of a connection, lets you open and close substreams.
//! The main item of this module is the `StreamMuxer` trait. An implementation
//! of `StreamMuxer` has ownership of a connection, lets you open and close
//! substreams.
//!
//! > **Note**: You normally don't need to use the methods of the `StreamMuxer` directly, as this
//! > is managed by the library's internals.
//! > **Note**: You normally don't need to use the methods of the `StreamMuxer`
//! > directly, as this
//! > is managed by the library's internals.
//!
//! Each substream of a connection is an isolated stream of data. All the substreams are muxed
//! together so that the data read from or written to each substream doesn't influence the other
//! substreams.
//! Each substream of a connection is an isolated stream of data. All the
//! substreams are muxed together so that the data read from or written to each
//! substream doesn't influence the other substreams.
//!
//! In the context of libp2p, each substream can use a different protocol. Contrary to opening a
//! connection, opening a substream is almost free in terms of resources. This means that you
//! shouldn't hesitate to rapidly open and close substreams, and to design protocols that don't
//! require maintaining long-lived channels of communication.
//! In the context of libp2p, each substream can use a different protocol.
//! Contrary to opening a connection, opening a substream is almost free in
//! terms of resources. This means that you shouldn't hesitate to rapidly open
//! and close substreams, and to design protocols that don't require maintaining
//! long-lived channels of communication.
//!
//! > **Example**: The Kademlia protocol opens a new substream for each request it wants to
//! > perform. Multiple requests can be performed simultaneously by opening multiple
//! > substreams, without having to worry about associating responses with the
//! > right request.
//! > **Example**: The Kademlia protocol opens a new substream for each request
//! > it wants to
//! > perform. Multiple requests can be performed simultaneously by opening
//! > multiple
//! > substreams, without having to worry about associating responses with the
//! > right request.
//!
//! # Implementing a muxing protocol
//!
//! In order to implement a muxing protocol, create an object that implements the `UpgradeInfo`,
//! `InboundUpgrade` and `OutboundUpgrade` traits. See the `upgrade` module for more information.
//! The `Output` associated type of the `InboundUpgrade` and `OutboundUpgrade` traits should be
//! identical, and should be an object that implements the `StreamMuxer` trait.
//! In order to implement a muxing protocol, create an object that implements
//! the `UpgradeInfo`, `InboundUpgrade` and `OutboundUpgrade` traits. See the
//! `upgrade` module for more information. The `Output` associated type of the
//! `InboundUpgrade` and `OutboundUpgrade` traits should be identical, and
//! should be an object that implements the `StreamMuxer` trait.
//!
//! The upgrade process will take ownership of the connection, which makes it possible for the
//! implementation of `StreamMuxer` to control everything that happens on the wire.
//! The upgrade process will take ownership of the connection, which makes it
//! possible for the implementation of `StreamMuxer` to control everything that
//! happens on the wire.

use std::{future::Future, pin::Pin};

use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite};
use futures::{
task::{Context, Poll},
AsyncRead,
AsyncWrite,
};
use multiaddr::Multiaddr;
use std::future::Future;
use std::pin::Pin;

pub use self::boxed::StreamMuxerBox;
pub use self::boxed::SubstreamBox;
pub use self::boxed::{StreamMuxerBox, SubstreamBox};

mod boxed;

/// Provides multiplexing for a connection by allowing users to open substreams.
///
/// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`].
/// The [`StreamMuxer`] itself is modelled closely after [`AsyncWrite`]. It features `poll`-style
/// functions that allow the implementation to make progress on various tasks.
/// A substream created by a [`StreamMuxer`] is a type that implements
/// [`AsyncRead`] and [`AsyncWrite`]. The [`StreamMuxer`] itself is modelled
/// closely after [`AsyncWrite`]. It features `poll`-style functions that allow
/// the implementation to make progress on various tasks.
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
/// Type of the object that represents the raw substream where data can be
/// read and written.
type Substream: AsyncRead + AsyncWrite;

/// Error type of the muxer
type Error: std::error::Error;

/// Poll for new inbound substreams.
///
/// This function should be called whenever callers are ready to accept more inbound streams. In
/// other words, callers may exercise back-pressure on incoming streams by not calling this
/// function if a certain limit is hit.
/// This function should be called whenever callers are ready to accept more
/// inbound streams. In other words, callers may exercise back-pressure
/// on incoming streams by not calling this function if a certain limit
/// is hit.
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -90,20 +103,23 @@ pub trait StreamMuxer {

/// Poll to close this [`StreamMuxer`].
///
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless and may be safely
/// dropped.
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become
/// useless and may be safely dropped.
///
/// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so
/// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
/// > **Note**: You are encouraged to call this method and wait for it to
/// > return `Ready`, so
/// > that the remote is properly informed of the shutdown. However, apart
/// > from
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

/// Poll to allow the underlying connection to make progress.
///
/// In contrast to all other `poll`-functions on [`StreamMuxer`], this function MUST be called
/// unconditionally. Because it will be called regardless, this function can be used by
/// implementations to return events about the underlying connection that the caller MUST deal
/// In contrast to all other `poll`-functions on [`StreamMuxer`], this
/// function MUST be called unconditionally. Because it will be called
/// regardless, this function can be used by implementations to return
/// events about the underlying connection that the caller MUST deal
/// with.
fn poll(
self: Pin<&mut Self>,
Expand All @@ -120,7 +136,8 @@ pub enum StreamMuxerEvent {

/// Extension trait for [`StreamMuxer`].
pub trait StreamMuxerExt: StreamMuxer + Sized {
/// Convenience function for calling [`StreamMuxer::poll_inbound`] for [`StreamMuxer`]s that are `Unpin`.
/// Convenience function for calling [`StreamMuxer::poll_inbound`] for
/// [`StreamMuxer`]s that are `Unpin`.
fn poll_inbound_unpin(
&mut self,
cx: &mut Context<'_>,
Expand All @@ -131,7 +148,8 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
Pin::new(self).poll_inbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_outbound`] for [`StreamMuxer`]s that are `Unpin`.
/// Convenience function for calling [`StreamMuxer::poll_outbound`] for
/// [`StreamMuxer`]s that are `Unpin`.
fn poll_outbound_unpin(
&mut self,
cx: &mut Context<'_>,
Expand All @@ -142,15 +160,17 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
Pin::new(self).poll_outbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll`] for [`StreamMuxer`]s that are `Unpin`.
/// Convenience function for calling [`StreamMuxer::poll`] for
/// [`StreamMuxer`]s that are `Unpin`.
fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`.
/// Convenience function for calling [`StreamMuxer::poll_close`] for
/// [`StreamMuxer`]s that are `Unpin`.
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where
Self: Unpin,
Expand Down
28 changes: 17 additions & 11 deletions core/src/muxing/boxed.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::muxing::{StreamMuxer, StreamMuxerEvent};
use std::{
error::Error,
fmt,
io,
io::{IoSlice, IoSliceMut},
pin::Pin,
task::{Context, Poll},
};

use futures::{AsyncRead, AsyncWrite};
use pin_project::pin_project;
use std::error::Error;
use std::fmt;
use std::io;
use std::io::{IoSlice, IoSliceMut};
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::muxing::{StreamMuxer, StreamMuxerEvent};

/// Abstract `StreamMuxer`.
pub struct StreamMuxerBox {
Expand All @@ -21,8 +25,8 @@ impl fmt::Debug for StreamMuxerBox {

/// Abstract type for asynchronous reading and writing.
///
/// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead`
/// and `AsyncWrite` capabilities.
/// A [`SubstreamBox`] erases the concrete type it is given and only retains its
/// `AsyncRead` and `AsyncWrite` capabilities.
pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>);

#[pin_project]
Expand Down Expand Up @@ -139,7 +143,8 @@ impl StreamMuxer for StreamMuxerBox {
}

impl SubstreamBox {
/// Construct a new [`SubstreamBox`] from something that implements [`AsyncRead`] and [`AsyncWrite`].
/// Construct a new [`SubstreamBox`] from something that implements
/// [`AsyncRead`] and [`AsyncWrite`].
pub fn new<S: AsyncRead + AsyncWrite + Send + 'static>(stream: S) -> Self {
Self(Box::pin(stream))
}
Expand All @@ -155,7 +160,8 @@ impl fmt::Debug for SubstreamBox {
trait AsyncReadWrite: AsyncRead + AsyncWrite {
/// Helper function to capture the erased inner type.
///
/// Used to make the [`Debug`] implementation of [`SubstreamBox`] more useful.
/// Used to make the [`Debug`] implementation of [`SubstreamBox`] more
/// useful.
fn type_name(&self) -> &'static str;
}

Expand Down
Loading

0 comments on commit 92af583

Please sign in to comment.