Skip to content

Commit

Permalink
Remove async-trait and most of BoxedFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Apr 15, 2024
1 parent f44c6ad commit 305ce1a
Show file tree
Hide file tree
Showing 18 changed files with 237 additions and 285 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ anyhow = "1"
arc-swap = "1.7"
asynchronous-codec = "0.7"
aes-gcm = "0.10.3"
async-trait = "0.1"
axum = { default-features = false, features = ["http1", "matched-path", "query", "tower-log", "ws", "json"], workspace = true }
bincode = "1.3.3"
blake3 = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl<'a> OpenRequest<'a> {
}
}

// TODO(al8n): we may cannot remove BoxFuture here, but it requires more changes,
// as async fn in trait will make this trait not Object Safe, so in other places
// we cannot use BoxedClient. We may need to reference the AsyncRead/AsyncWrite trait
// to make the trait object safe.
pub trait ClientEventsProxy {
/// # Cancellation Safety
/// This future must be safe to cancel.
Expand Down
25 changes: 13 additions & 12 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -314,15 +315,17 @@ mod sealed {
impl ChannelHalve for Callback {}
}

#[async_trait::async_trait]
trait ComposeNetworkMessage<Op>
where
Self: Sized,
Op: Operation + Send + 'static,
{
fn initiate_op(self, op_manager: &OpManager) -> Op;

async fn resume_op(op: Op, op_manager: &OpManager) -> Result<(), OpError>;
fn resume_op(
op: Op,
op_manager: &OpManager,
) -> impl Future<Output = Result<(), OpError>> + Send;
}

#[allow(unused)]
Expand All @@ -331,7 +334,6 @@ struct GetContract {
fetch_contract: bool,
}

#[async_trait::async_trait]
impl ComposeNetworkMessage<operations::get::GetOp> for GetContract {
fn initiate_op(self, _op_manager: &OpManager) -> operations::get::GetOp {
operations::get::start_op(self.key, self.fetch_contract)
Expand All @@ -347,7 +349,6 @@ struct SubscribeContract {
key: ContractKey,
}

#[async_trait::async_trait]
impl ComposeNetworkMessage<operations::subscribe::SubscribeOp> for SubscribeContract {
fn initiate_op(self, _op_manager: &OpManager) -> operations::subscribe::SubscribeOp {
operations::subscribe::start_op(self.key)
Expand All @@ -368,7 +369,6 @@ struct PutContract {
related_contracts: RelatedContracts<'static>,
}

#[async_trait::async_trait]
impl ComposeNetworkMessage<operations::put::PutOp> for PutContract {
fn initiate_op(self, op_manager: &OpManager) -> operations::put::PutOp {
let PutContract {
Expand All @@ -395,7 +395,6 @@ struct UpdateContract {
new_state: WrappedState,
}

#[async_trait::async_trait]
impl ComposeNetworkMessage<operations::update::UpdateOp> for UpdateContract {
fn initiate_op(self, _op_manager: &OpManager) -> operations::update::UpdateOp {
let UpdateContract { key, new_state } = self;
Expand All @@ -411,23 +410,25 @@ impl ComposeNetworkMessage<operations::update::UpdateOp> for UpdateContract {
}
}

#[async_trait::async_trait]
pub(crate) trait ContractExecutor: Send + 'static {
async fn fetch_contract(
fn fetch_contract(
&mut self,
key: ContractKey,
fetch_contract: bool,
) -> Result<(WrappedState, Option<ContractContainer>), ExecutorError>;
) -> impl Future<Output = Result<(WrappedState, Option<ContractContainer>), ExecutorError>> + Send;

async fn store_contract(&mut self, contract: ContractContainer) -> Result<(), ExecutorError>;
fn store_contract(
&mut self,
contract: ContractContainer,
) -> impl Future<Output = Result<(), ExecutorError>> + Send;

async fn upsert_contract_state(
fn upsert_contract_state(
&mut self,
key: ContractKey,
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
) -> Result<WrappedState, ExecutorError>;
) -> impl Future<Output = Result<WrappedState, ExecutorError>> + Send;
}

/// A WASM executor which will run any contracts, delegates, etc. registered.
Expand Down
5 changes: 2 additions & 3 deletions crates/core/src/contract/executor/mock_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ impl Executor<MockRuntime> {
}
}

#[async_trait::async_trait]
impl ContractExecutor for Executor<MockRuntime> {
async fn fetch_contract(
&mut self,
Expand Down Expand Up @@ -99,7 +98,7 @@ impl ContractExecutor for Executor<MockRuntime> {
.store(key, incoming_state.clone(), contract.params().into_owned())
.await
.map_err(ExecutorError::other)?;
return Ok(incoming_state);
Ok(incoming_state)
}
(Either::Left(incoming_state), None) => {
// update case
Expand All @@ -108,7 +107,7 @@ impl ContractExecutor for Executor<MockRuntime> {
.update(&key, incoming_state.clone())
.await
.map_err(ExecutorError::other)?;
return Ok(incoming_state);
Ok(incoming_state)
}
(update, contract) => unreachable!("{update:?}, {contract:?}"),
}
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/contract/executor/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::*;

#[async_trait::async_trait]
impl ContractExecutor for Executor<Runtime> {
async fn fetch_contract(
&mut self,
Expand Down
76 changes: 30 additions & 46 deletions crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::BTreeMap;
use std::future::Future;
use std::hash::Hash;
use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
use std::time::Duration;

use freenet_stdlib::client_api::{ClientError, ClientRequest, HostResponse};
use freenet_stdlib::prelude::*;
use futures::{future::BoxFuture, FutureExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};

Expand Down Expand Up @@ -59,7 +59,7 @@ pub(crate) trait ContractHandler {
contract_handler_channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
builder: Self::Builder,
) -> BoxFuture<'static, Result<Self, DynError>>
) -> impl Future<Output = Result<Self, DynError>>
where
Self: Sized + 'static;

Expand All @@ -72,7 +72,7 @@ pub(crate) trait ContractHandler {
req: ClientRequest<'a>,
client_id: ClientId,
updates: Option<UnboundedSender<Result<HostResponse, ClientError>>>,
) -> BoxFuture<'a, Result<HostResponse, DynError>>;
) -> impl Future<Output = Result<HostResponse, DynError>> + 'a;

fn executor(&mut self) -> &mut Self::ContractExecutor;
}
Expand All @@ -86,39 +86,33 @@ impl ContractHandler for NetworkContractHandler<Runtime> {
type Builder = PeerCliConfig;
type ContractExecutor = Executor<Runtime>;

fn build(
async fn build(
channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
config: Self::Builder,
) -> BoxFuture<'static, Result<Self, DynError>>
) -> Result<Self, DynError>
where
Self: Sized + 'static,
{
async {
let executor = Executor::from_config(config, Some(executor_request_sender)).await?;
Ok(Self { executor, channel })
}
.boxed()
let executor = Executor::from_config(config, Some(executor_request_sender)).await?;
Ok(Self { executor, channel })
}

fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve> {
&mut self.channel
}

fn handle_request<'a, 's: 'a>(
async fn handle_request<'a, 's: 'a>(
&'s mut self,
req: ClientRequest<'a>,
client_id: ClientId,
updates: Option<UnboundedSender<Result<HostResponse, ClientError>>>,
) -> BoxFuture<'a, Result<HostResponse, DynError>> {
async move {
let res = self
.executor
.handle_request(client_id, req, updates)
.await?;
Ok(res)
}
.boxed()
) -> Result<HostResponse, DynError> {
let res = self
.executor
.handle_request(client_id, req, updates)
.await?;
Ok(res)
}

fn executor(&mut self) -> &mut Self::ContractExecutor {
Expand All @@ -131,39 +125,33 @@ impl ContractHandler for NetworkContractHandler<super::MockRuntime> {
type Builder = String;
type ContractExecutor = Executor<super::MockRuntime>;

fn build(
async fn build(
channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
identifier: Self::Builder,
) -> BoxFuture<'static, Result<Self, DynError>>
) -> Result<Self, DynError>
where
Self: Sized + 'static,
{
async move {
let executor = Executor::new_mock(&identifier, executor_request_sender).await?;
Ok(Self { executor, channel })
}
.boxed()
let executor = Executor::new_mock(&identifier, executor_request_sender).await?;
Ok(Self { executor, channel })
}

fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve> {
&mut self.channel
}

fn handle_request<'a, 's: 'a>(
async fn handle_request<'a, 's: 'a>(
&'s mut self,
req: ClientRequest<'a>,
client_id: ClientId,
updates: Option<UnboundedSender<Result<HostResponse, ClientError>>>,
) -> BoxFuture<'a, Result<HostResponse, DynError>> {
async move {
let res = self
.executor
.handle_request(client_id, req, updates)
.await?;
Ok(res)
}
.boxed()
) -> Result<HostResponse, DynError> {
let res = self
.executor
.handle_request(client_id, req, updates)
.await?;
Ok(res)
}

fn executor(&mut self) -> &mut Self::ContractExecutor {
Expand Down Expand Up @@ -484,7 +472,6 @@ pub mod test {
pub(super) mod in_memory {
use crate::client_events::ClientId;
use freenet_stdlib::client_api::{ClientError, ClientRequest, HostResponse};
use futures::{future::BoxFuture, FutureExt};
use tokio::sync::mpsc::UnboundedSender;

use super::{
Expand Down Expand Up @@ -520,30 +507,27 @@ pub(super) mod in_memory {
type Builder = String;
type ContractExecutor = Executor<MockRuntime>;

fn build(
async fn build(
channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
identifier: Self::Builder,
) -> BoxFuture<'static, Result<Self, DynError>>
) -> Result<Self, DynError>
where
Self: Sized + 'static,
{
async move {
Ok(MemoryContractHandler::new(channel, executor_request_sender, &identifier).await)
}
.boxed()
Ok(MemoryContractHandler::new(channel, executor_request_sender, &identifier).await)
}

fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve> {
&mut self.channel
}

fn handle_request<'a, 's: 'a>(
async fn handle_request<'a, 's: 'a>(
&'s mut self,
_req: ClientRequest<'a>,
_client_id: ClientId,
_updates: Option<UnboundedSender<Result<HostResponse, ClientError>>>,
) -> BoxFuture<'static, Result<HostResponse, DynError>> {
) -> Result<HostResponse, DynError> {
unreachable!()
}

Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/node/network_bridge.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Types and definitions to handle all inter-peer communication.
use std::future::Future;
use std::ops::{Deref, DerefMut};

use either::Either;
Expand All @@ -22,13 +23,13 @@ pub(crate) type ConnResult<T> = std::result::Result<T, ConnectionError>;

/// Allows handling of connections to the network as well as sending messages
/// to other peers in the network with whom connection has been established.
#[async_trait::async_trait]
pub(crate) trait NetworkBridge: Send + Sync {
async fn add_connection(&mut self, peer: PeerId) -> ConnResult<()>;
fn add_connection(&mut self, peer: PeerId) -> impl Future<Output = ConnResult<()>> + Send;

async fn drop_connection(&mut self, peer: &PeerId) -> ConnResult<()>;
fn drop_connection(&mut self, peer: &PeerId) -> impl Future<Output = ConnResult<()>> + Send;

async fn send(&self, target: &PeerId, msg: NetMessage) -> ConnResult<()>;
fn send(&self, target: &PeerId, msg: NetMessage)
-> impl Future<Output = ConnResult<()>> + Send;
}

#[derive(Debug, thiserror::Error, Serialize, Deserialize)]
Expand Down
23 changes: 9 additions & 14 deletions crates/core/src/node/network_bridge/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{
};

use crossbeam::channel::{self, Receiver, Sender};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::OnceCell;
use rand::{prelude::StdRng, seq::SliceRandom, Rng, SeedableRng};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -61,7 +60,6 @@ impl MemoryConnManager {
}
}

#[async_trait::async_trait]
impl NetworkBridge for MemoryConnManager {
async fn send(&self, target: &PeerId, msg: NetMessage) -> super::ConnResult<()> {
self.log_register
Expand All @@ -83,19 +81,16 @@ impl NetworkBridge for MemoryConnManager {
}

impl NetworkBridgeExt for MemoryConnManager {
fn recv(&mut self) -> BoxFuture<'_, Result<NetMessage, ConnectionError>> {
async {
loop {
let mut queue = self.msg_queue.lock().await;
let Some(msg) = queue.pop() else {
std::mem::drop(queue);
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
};
return Ok(msg);
}
async fn recv(&mut self) -> Result<NetMessage, ConnectionError> {
loop {
let mut queue = self.msg_queue.lock().await;
let Some(msg) = queue.pop() else {
std::mem::drop(queue);
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
};
return Ok(msg);
}
.boxed()
}
}

Expand Down
Loading

0 comments on commit 305ce1a

Please sign in to comment.