Skip to content

Commit

Permalink
Get rid of IdentityHandlerExt
Browse files Browse the repository at this point in the history
replace it with `ListenerSetup` trait that is hopefully less confusing to use
  • Loading branch information
akoshelev committed Mar 20, 2024
1 parent 9312a43 commit 0ac8770
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 32 deletions.
5 changes: 2 additions & 3 deletions ipa-core/src/helpers/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use crate::{
gateway::{
receive::GatewayReceivers, send::GatewaySenders, transport::RoleResolvingTransport,
},
HelperChannelId, HelperIdentity, Message, Role, RoleAssignment, RouteId, TotalRecords,
Transport,
HelperChannelId, Message, Role, RoleAssignment, RouteId, TotalRecords, Transport,
},
protocol::QueryId,
};
Expand All @@ -30,7 +29,7 @@ use crate::{
/// To avoid proliferation of type parameters, most code references this concrete type alias, rather
/// than a type parameter `T: Transport`.
#[cfg(feature = "in-memory-infra")]
pub type TransportImpl = super::transport::InMemoryTransport<HelperIdentity>;
pub type TransportImpl = super::transport::InMemoryTransport<crate::helpers::HelperIdentity>;

#[cfg(feature = "real-world-infra")]
pub type TransportImpl = crate::sync::Arc<crate::net::HttpTransport>;
Expand Down
13 changes: 0 additions & 13 deletions ipa-core/src/helpers/transport/in_memory/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,6 @@ pub trait RequestHandler<I: TransportIdentity>: Send {
) -> impl Future<Output = Result<(), Error<I>>> + Send;
}

/// Helper trait to bind in-memory request handlers to transport identity.
pub trait IdentityHandlerExt: TransportIdentity {
type Handler: RequestHandler<Self>;
}

impl IdentityHandlerExt for HelperIdentity {
type Handler = HelperRequestHandler;
}

impl IdentityHandlerExt for ShardIndex {
type Handler = ();
}

impl RequestHandler<ShardIndex> for () {
async fn handle(
&mut self,
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/helpers/transport/in_memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use sharding::InMemoryShardNetwork;
pub use transport::Setup;

use crate::{
helpers::{HelperIdentity, TransportCallbacks},
helpers::{transport::in_memory::transport::ListenerSetup, HelperIdentity, TransportCallbacks},
sync::{Arc, Weak},
};

Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/helpers/transport/in_memory/sharding.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
helpers::{
transport::in_memory::transport::{InMemoryTransport, Setup},
transport::in_memory::transport::{InMemoryTransport, ListenerSetup, Setup},
HelperIdentity,
},
sharding::ShardIndex,
Expand Down
63 changes: 49 additions & 14 deletions ipa-core/src/helpers/transport/in_memory/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use crate::{
error::BoxError,
helpers::{
transport::in_memory::{
handlers::{IdentityHandlerExt, RequestHandler},
handlers::{HelperRequestHandler, RequestHandler},
routing::Addr,
},
NoResourceIdentifier, QueryIdBinding, ReceiveRecords, RouteId, RouteParams, StepBinding,
StreamCollection, Transport,
HelperIdentity, NoResourceIdentifier, QueryIdBinding, ReceiveRecords, RouteId, RouteParams,
StepBinding, StreamCollection, Transport, TransportIdentity,
},
protocol::{step::Gate, QueryId},
sharding::ShardIndex,
sync::{Arc, Weak},
};

Expand Down Expand Up @@ -65,7 +66,7 @@ pub struct InMemoryTransport<I> {
record_streams: StreamCollection<I, InMemoryStream>,
}

impl<I: IdentityHandlerExt> InMemoryTransport<I> {
impl<I: TransportIdentity> InMemoryTransport<I> {
#[must_use]
fn new(identity: I, connections: HashMap<I, ConnectionTx<I>>) -> Self {
Self {
Expand All @@ -84,7 +85,11 @@ impl<I: IdentityHandlerExt> InMemoryTransport<I> {
/// out and processes it, the same way as query processor does. That will allow all tasks to be
/// created in one place (driver). It does not affect the [`Transport`] interface,
/// so I'll leave it as is for now.
fn listen(self: &Arc<Self>, mut callbacks: I::Handler, mut rx: ConnectionRx<I>) {
fn listen<L: ListenerSetup<Identity = I>>(
self: &Arc<Self>,
mut callbacks: L::Handler,
mut rx: ConnectionRx<I>,
) {
tokio::spawn(
{
let streams = self.record_streams.clone();
Expand Down Expand Up @@ -133,7 +138,7 @@ impl<I: IdentityHandlerExt> InMemoryTransport<I> {
}

#[async_trait]
impl<I: IdentityHandlerExt> Transport for Weak<InMemoryTransport<I>> {
impl<I: TransportIdentity> Transport for Weak<InMemoryTransport<I>> {
type Identity = I;
type RecordsStream = ReceiveRecords<I, InMemoryStream>;
type Error = Error<I>;
Expand Down Expand Up @@ -251,7 +256,7 @@ pub struct Setup<I> {
connections: HashMap<I, ConnectionTx<I>>,
}

impl<I: IdentityHandlerExt> Setup<I> {
impl<I: TransportIdentity> Setup<I> {
#[must_use]
pub fn new(identity: I) -> Self {
let (tx, rx) = channel(16);
Expand All @@ -278,19 +283,47 @@ impl<I: IdentityHandlerExt> Setup<I> {
.is_none());
}

fn into_active_conn<H: Into<I::Handler>>(
fn into_active_conn<H: Into<<Self as ListenerSetup>::Handler>>(
self,
callbacks: H,
) -> (ConnectionTx<I>, Arc<InMemoryTransport<I>>) {
) -> (ConnectionTx<I>, Arc<InMemoryTransport<I>>)
where
Self: ListenerSetup<Identity = I>,
{
let transport = Arc::new(InMemoryTransport::new(self.identity, self.connections));
transport.listen(callbacks.into(), self.rx);
transport.listen::<Self>(callbacks.into(), self.rx);

(self.tx, transport)
}
}

#[must_use]
pub fn start<H: Into<I::Handler>>(self, callbacks: H) -> Arc<InMemoryTransport<I>> {
self.into_active_conn(callbacks).1
/// Trait to tie up different transports to the requests handlers they can use inside their
/// listen loop.
pub trait ListenerSetup {
type Identity: TransportIdentity;
type Handler: RequestHandler<Self::Identity> + 'static;
type Listener;

fn start<I: Into<Self::Handler>>(self, handler: I) -> Self::Listener;
}

impl ListenerSetup for Setup<HelperIdentity> {
type Identity = HelperIdentity;
type Handler = HelperRequestHandler;
type Listener = Arc<InMemoryTransport<Self::Identity>>;

fn start<I: Into<Self::Handler>>(self, handler: I) -> Self::Listener {
self.into_active_conn(handler).1
}
}

impl ListenerSetup for Setup<ShardIndex> {
type Identity = ShardIndex;
type Handler = ();
type Listener = Arc<InMemoryTransport<Self::Identity>>;

fn start<I: Into<Self::Handler>>(self, handler: I) -> Self::Listener {
self.into_active_conn(handler).1
}
}

Expand All @@ -314,7 +347,9 @@ mod tests {
helpers::{
query::{QueryConfig, QueryType::TestMultiply},
transport::in_memory::{
transport::{Addr, ConnectionTx, Error, InMemoryStream, InMemoryTransport},
transport::{
Addr, ConnectionTx, Error, InMemoryStream, InMemoryTransport, ListenerSetup,
},
InMemoryMpcNetwork, Setup,
},
HelperIdentity, OrderingSender, RouteId, Transport, TransportCallbacks,
Expand Down

0 comments on commit 0ac8770

Please sign in to comment.