From 5d740a8665040cf22c5c695d234fcf963386160f Mon Sep 17 00:00:00 2001 From: Panagiotis Ganelis <50522617+PanGan21@users.noreply.github.com> Date: Wed, 20 Sep 2023 04:36:39 +0300 Subject: [PATCH] refactor(core): blanket implementation of connection upgrade Introduces blanket implementation of `{In,Out}boundConnectionUpgrade` and uses it in transport upgrade infrastructure. Resolves: #4307. Pull-Request: #4316. --- core/src/transport/upgrade.rs | 52 +++++++++++++++--------------- core/src/upgrade.rs | 60 +++++++++++++++++++++++++++++++++++ core/src/upgrade/apply.rs | 24 +++++++------- 3 files changed, 98 insertions(+), 38 deletions(-) diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 201918f2635..8525ab741ff 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -30,8 +30,8 @@ use crate::{ TransportError, TransportEvent, }, upgrade::{ - self, apply_inbound, apply_outbound, InboundUpgrade, InboundUpgradeApply, OutboundUpgrade, - OutboundUpgradeApply, UpgradeError, + self, apply_inbound, apply_outbound, InboundConnectionUpgrade, InboundUpgradeApply, + OutboundConnectionUpgrade, OutboundUpgradeApply, UpgradeError, }, Negotiated, }; @@ -101,8 +101,8 @@ where T: Transport, C: AsyncRead + AsyncWrite + Unpin, D: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = (PeerId, D), Error = E>, - U: OutboundUpgrade, Output = (PeerId, D), Error = E> + Clone, + U: InboundConnectionUpgrade, Output = (PeerId, D), Error = E>, + U: OutboundConnectionUpgrade, Output = (PeerId, D), Error = E> + Clone, E: Error + 'static, { let version = self.version; @@ -123,7 +123,7 @@ where pub struct Authenticate where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + OutboundUpgrade>, + U: InboundConnectionUpgrade> + OutboundConnectionUpgrade>, { #[pin] inner: EitherUpgrade, @@ -132,11 +132,11 @@ where impl Future for Authenticate where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> - + OutboundUpgrade< + U: InboundConnectionUpgrade> + + OutboundConnectionUpgrade< Negotiated, - Output = >>::Output, - Error = >>::Error, + Output = >>::Output, + Error = >>::Error, >, { type Output = as Future>::Output; @@ -155,7 +155,7 @@ where pub struct Multiplex where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + OutboundUpgrade>, + U: InboundConnectionUpgrade> + OutboundConnectionUpgrade>, { peer_id: Option, #[pin] @@ -165,8 +165,8 @@ where impl Future for Multiplex where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = M, Error = E>, - U: OutboundUpgrade, Output = M, Error = E>, + U: InboundConnectionUpgrade, Output = M, Error = E>, + U: OutboundConnectionUpgrade, Output = M, Error = E>, { type Output = Result<(PeerId, M), UpgradeError>; @@ -208,8 +208,8 @@ where T: Transport, C: AsyncRead + AsyncWrite + Unpin, D: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D, Error = E>, - U: OutboundUpgrade, Output = D, Error = E> + Clone, + U: InboundConnectionUpgrade, Output = D, Error = E>, + U: OutboundConnectionUpgrade, Output = D, Error = E> + Clone, E: Error + 'static, { Authenticated(Builder::new( @@ -236,8 +236,8 @@ where T: Transport, C: AsyncRead + AsyncWrite + Unpin, M: StreamMuxer, - U: InboundUpgrade, Output = M, Error = E>, - U: OutboundUpgrade, Output = M, Error = E> + Clone, + U: InboundConnectionUpgrade, Output = M, Error = E>, + U: OutboundConnectionUpgrade, Output = M, Error = E> + Clone, E: Error + 'static, { let version = self.0.version; @@ -269,8 +269,8 @@ where T: Transport, C: AsyncRead + AsyncWrite + Unpin, M: StreamMuxer, - U: InboundUpgrade, Output = M, Error = E>, - U: OutboundUpgrade, Output = M, Error = E> + Clone, + U: InboundConnectionUpgrade, Output = M, Error = E>, + U: OutboundConnectionUpgrade, Output = M, Error = E> + Clone, E: Error + 'static, F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone, { @@ -395,8 +395,8 @@ where T: Transport, T::Error: 'static, C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D, Error = E>, - U: OutboundUpgrade, Output = D, Error = E> + Clone, + U: InboundConnectionUpgrade, Output = D, Error = E>, + U: OutboundConnectionUpgrade, Output = D, Error = E> + Clone, E: Error + 'static, { type Output = (PeerId, D); @@ -502,7 +502,7 @@ where /// The [`Transport::Dial`] future of an [`Upgrade`]d transport. pub struct DialUpgradeFuture where - U: OutboundUpgrade>, + U: OutboundConnectionUpgrade>, C: AsyncRead + AsyncWrite + Unpin, { future: Pin>, @@ -513,7 +513,7 @@ impl Future for DialUpgradeFuture where F: TryFuture, C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade, Output = D>, + U: OutboundConnectionUpgrade, Output = D>, U::Error: Error, { type Output = Result<(PeerId, D), TransportUpgradeError>; @@ -553,7 +553,7 @@ where impl Unpin for DialUpgradeFuture where - U: OutboundUpgrade>, + U: OutboundConnectionUpgrade>, C: AsyncRead + AsyncWrite + Unpin, { } @@ -562,7 +562,7 @@ where pub struct ListenerUpgradeFuture where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, + U: InboundConnectionUpgrade>, { future: Pin>, upgrade: future::Either, (PeerId, InboundUpgradeApply)>, @@ -572,7 +572,7 @@ impl Future for ListenerUpgradeFuture where F: TryFuture, C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D>, + U: InboundConnectionUpgrade, Output = D>, U::Error: Error, { type Output = Result<(PeerId, D), TransportUpgradeError>; @@ -613,6 +613,6 @@ where impl Unpin for ListenerUpgradeFuture where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, + U: InboundConnectionUpgrade>, { } diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index f6bf72d1f4a..7db1853b56c 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -125,3 +125,63 @@ pub trait OutboundUpgrade: UpgradeInfo { /// The `info` is the identifier of the protocol, as produced by `protocol_info`. fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future; } + +/// Possible upgrade on an inbound connection +pub trait InboundConnectionUpgrade: UpgradeInfo { + /// Output after the upgrade has been successfully negotiated and the handshake performed. + type Output; + /// Possible error during the handshake. + type Error; + /// Future that performs the handshake with the remote. + type Future: Future>; + + /// After we have determined that the remote supports one of the protocols we support, this + /// method is called to start the handshake. + /// + /// The `info` is the identifier of the protocol, as produced by `protocol_info`. + fn upgrade_inbound(self, socket: T, info: Self::Info) -> Self::Future; +} + +/// Possible upgrade on an outbound connection +pub trait OutboundConnectionUpgrade: UpgradeInfo { + /// Output after the upgrade has been successfully negotiated and the handshake performed. + type Output; + /// Possible error during the handshake. + type Error; + /// Future that performs the handshake with the remote. + type Future: Future>; + + /// After we have determined that the remote supports one of the protocols we support, this + /// method is called to start the handshake. + /// + /// The `info` is the identifier of the protocol, as produced by `protocol_info`. + fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future; +} + +// Blanket implementation for InboundConnectionUpgrade based on InboundUpgrade for backwards compatibility +impl InboundConnectionUpgrade for U +where + U: InboundUpgrade, +{ + type Output = >::Output; + type Error = >::Error; + type Future = >::Future; + + fn upgrade_inbound(self, socket: T, info: Self::Info) -> Self::Future { + self.upgrade_inbound(socket, info) + } +} + +// Blanket implementation for OutboundConnectionUpgrade based on OutboundUpgrade for backwards compatibility +impl OutboundConnectionUpgrade for U +where + U: OutboundUpgrade, +{ + type Output = >::Output; + type Error = >::Error; + type Future = >::Future; + + fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future { + self.upgrade_outbound(socket, info) + } +} diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index aa997435673..aefce686f01 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}; +use crate::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeError}; use crate::{connection::ConnectedPoint, Negotiated}; use futures::{future::Either, prelude::*}; use log::debug; @@ -37,7 +37,7 @@ pub(crate) fn apply( ) -> Either, OutboundUpgradeApply> where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + OutboundUpgrade>, + U: InboundConnectionUpgrade> + OutboundConnectionUpgrade>, { match cp { ConnectedPoint::Dialer { role_override, .. } if role_override.is_dialer() => { @@ -51,7 +51,7 @@ where pub(crate) fn apply_inbound(conn: C, up: U) -> InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, + U: InboundConnectionUpgrade>, { InboundUpgradeApply { inner: InboundUpgradeApplyState::Init { @@ -65,7 +65,7 @@ where pub(crate) fn apply_outbound(conn: C, up: U, v: Version) -> OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade>, + U: OutboundConnectionUpgrade>, { OutboundUpgradeApply { inner: OutboundUpgradeApplyState::Init { @@ -79,7 +79,7 @@ where pub struct InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, + U: InboundConnectionUpgrade>, { inner: InboundUpgradeApplyState, } @@ -88,7 +88,7 @@ where enum InboundUpgradeApplyState where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, + U: InboundConnectionUpgrade>, { Init { future: ListenerSelectFuture, @@ -104,14 +104,14 @@ where impl Unpin for InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, + U: InboundConnectionUpgrade>, { } impl Future for InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade>, + U: InboundConnectionUpgrade>, { type Output = Result>; @@ -162,7 +162,7 @@ where pub struct OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade>, + U: OutboundConnectionUpgrade>, { inner: OutboundUpgradeApplyState, } @@ -170,7 +170,7 @@ where enum OutboundUpgradeApplyState where C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade>, + U: OutboundConnectionUpgrade>, { Init { future: DialerSelectFuture::IntoIter>, @@ -186,14 +186,14 @@ where impl Unpin for OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade>, + U: OutboundConnectionUpgrade>, { } impl Future for OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade>, + U: OutboundConnectionUpgrade>, { type Output = Result>;