Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove async-trait and most of BoxedFuture #1042

Merged
merged 3 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ anyhow = "1"
arc-swap = "1.7"
asynchronous-codec = "0.7"
aes-gcm = "0.10.3"
async-trait = "0.1"
axum = { default-features = false, features = ["http1", "matched-path", "query", "tower-log", "ws", "json"], workspace = true }
bincode = "1.3.3"
blake3 = { workspace = true }
Expand Down
25 changes: 13 additions & 12 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::collections::HashMap;
use std::fmt::Display;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -314,15 +315,17 @@ mod sealed {
impl ChannelHalve for Callback {}
}

#[async_trait::async_trait]
trait ComposeNetworkMessage<Op>
where
Self: Sized,
Op: Operation + Send + 'static,
{
fn initiate_op(self, op_manager: &OpManager) -> Op;

async fn resume_op(op: Op, op_manager: &OpManager) -> Result<(), OpError>;
fn resume_op(
op: Op,
op_manager: &OpManager,
) -> impl Future<Output = Result<(), OpError>> + Send;
}

#[allow(unused)]
Expand All @@ -331,7 +334,6 @@ struct GetContract {
fetch_contract: bool,
}

#[async_trait::async_trait]
impl ComposeNetworkMessage<operations::get::GetOp> for GetContract {
fn initiate_op(self, _op_manager: &OpManager) -> operations::get::GetOp {
operations::get::start_op(self.key, self.fetch_contract)
Expand All @@ -347,7 +349,6 @@ struct SubscribeContract {
key: ContractKey,
}

#[async_trait::async_trait]
impl ComposeNetworkMessage<operations::subscribe::SubscribeOp> for SubscribeContract {
fn initiate_op(self, _op_manager: &OpManager) -> operations::subscribe::SubscribeOp {
operations::subscribe::start_op(self.key)
Expand All @@ -368,7 +369,6 @@ struct PutContract {
related_contracts: RelatedContracts<'static>,
}

#[async_trait::async_trait]
impl ComposeNetworkMessage<operations::put::PutOp> for PutContract {
fn initiate_op(self, op_manager: &OpManager) -> operations::put::PutOp {
let PutContract {
Expand All @@ -395,7 +395,6 @@ struct UpdateContract {
new_state: WrappedState,
}

#[async_trait::async_trait]
impl ComposeNetworkMessage<operations::update::UpdateOp> for UpdateContract {
fn initiate_op(self, _op_manager: &OpManager) -> operations::update::UpdateOp {
let UpdateContract { key, new_state } = self;
Expand All @@ -411,23 +410,25 @@ impl ComposeNetworkMessage<operations::update::UpdateOp> for UpdateContract {
}
}

#[async_trait::async_trait]
pub(crate) trait ContractExecutor: Send + 'static {
async fn fetch_contract(
fn fetch_contract(
&mut self,
key: ContractKey,
fetch_contract: bool,
) -> Result<(WrappedState, Option<ContractContainer>), ExecutorError>;
) -> impl Future<Output = Result<(WrappedState, Option<ContractContainer>), ExecutorError>> + Send;

async fn store_contract(&mut self, contract: ContractContainer) -> Result<(), ExecutorError>;
fn store_contract(
&mut self,
contract: ContractContainer,
) -> impl Future<Output = Result<(), ExecutorError>> + Send;

async fn upsert_contract_state(
fn upsert_contract_state(
&mut self,
key: ContractKey,
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
code: Option<ContractContainer>,
) -> Result<WrappedState, ExecutorError>;
) -> impl Future<Output = Result<WrappedState, ExecutorError>> + Send;
}

/// A WASM executor which will run any contracts, delegates, etc. registered.
Expand Down
5 changes: 2 additions & 3 deletions crates/core/src/contract/executor/mock_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ impl Executor<MockRuntime> {
}
}

#[async_trait::async_trait]
impl ContractExecutor for Executor<MockRuntime> {
async fn fetch_contract(
&mut self,
Expand Down Expand Up @@ -99,7 +98,7 @@ impl ContractExecutor for Executor<MockRuntime> {
.store(key, incoming_state.clone(), contract.params().into_owned())
.await
.map_err(ExecutorError::other)?;
return Ok(incoming_state);
Ok(incoming_state)
}
(Either::Left(incoming_state), None) => {
// update case
Expand All @@ -108,7 +107,7 @@ impl ContractExecutor for Executor<MockRuntime> {
.update(&key, incoming_state.clone())
.await
.map_err(ExecutorError::other)?;
return Ok(incoming_state);
Ok(incoming_state)
}
(update, contract) => unreachable!("{update:?}, {contract:?}"),
}
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/contract/executor/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::*;

#[async_trait::async_trait]
impl ContractExecutor for Executor<Runtime> {
async fn fetch_contract(
&mut self,
Expand Down
76 changes: 30 additions & 46 deletions crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::BTreeMap;
use std::future::Future;
use std::hash::Hash;
use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
use std::time::Duration;

use freenet_stdlib::client_api::{ClientError, ClientRequest, HostResponse};
use freenet_stdlib::prelude::*;
use futures::{future::BoxFuture, FutureExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};

Expand Down Expand Up @@ -59,7 +59,7 @@ pub(crate) trait ContractHandler {
contract_handler_channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
builder: Self::Builder,
) -> BoxFuture<'static, Result<Self, DynError>>
) -> impl Future<Output = Result<Self, DynError>> + Send
where
Self: Sized + 'static;

Expand All @@ -72,7 +72,7 @@ pub(crate) trait ContractHandler {
req: ClientRequest<'a>,
client_id: ClientId,
updates: Option<UnboundedSender<Result<HostResponse, ClientError>>>,
) -> BoxFuture<'a, Result<HostResponse, DynError>>;
) -> impl Future<Output = Result<HostResponse, DynError>> + Send + 'a;

fn executor(&mut self) -> &mut Self::ContractExecutor;
}
Expand All @@ -86,39 +86,33 @@ impl ContractHandler for NetworkContractHandler<Runtime> {
type Builder = PeerCliConfig;
type ContractExecutor = Executor<Runtime>;

fn build(
async fn build(
channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
config: Self::Builder,
) -> BoxFuture<'static, Result<Self, DynError>>
) -> Result<Self, DynError>
where
Self: Sized + 'static,
{
async {
let executor = Executor::from_config(config, Some(executor_request_sender)).await?;
Ok(Self { executor, channel })
}
.boxed()
let executor = Executor::from_config(config, Some(executor_request_sender)).await?;
Ok(Self { executor, channel })
}

fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve> {
&mut self.channel
}

fn handle_request<'a, 's: 'a>(
async fn handle_request<'a, 's: 'a>(
&'s mut self,
req: ClientRequest<'a>,
client_id: ClientId,
updates: Option<UnboundedSender<Result<HostResponse, ClientError>>>,
) -> BoxFuture<'a, Result<HostResponse, DynError>> {
async move {
let res = self
.executor
.handle_request(client_id, req, updates)
.await?;
Ok(res)
}
.boxed()
) -> Result<HostResponse, DynError> {
let res = self
.executor
.handle_request(client_id, req, updates)
.await?;
Ok(res)
}

fn executor(&mut self) -> &mut Self::ContractExecutor {
Expand All @@ -131,39 +125,33 @@ impl ContractHandler for NetworkContractHandler<super::MockRuntime> {
type Builder = String;
type ContractExecutor = Executor<super::MockRuntime>;

fn build(
async fn build(
channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
identifier: Self::Builder,
) -> BoxFuture<'static, Result<Self, DynError>>
) -> Result<Self, DynError>
where
Self: Sized + 'static,
{
async move {
let executor = Executor::new_mock(&identifier, executor_request_sender).await?;
Ok(Self { executor, channel })
}
.boxed()
let executor = Executor::new_mock(&identifier, executor_request_sender).await?;
Ok(Self { executor, channel })
}

fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve> {
&mut self.channel
}

fn handle_request<'a, 's: 'a>(
async fn handle_request<'a, 's: 'a>(
&'s mut self,
req: ClientRequest<'a>,
client_id: ClientId,
updates: Option<UnboundedSender<Result<HostResponse, ClientError>>>,
) -> BoxFuture<'a, Result<HostResponse, DynError>> {
async move {
let res = self
.executor
.handle_request(client_id, req, updates)
.await?;
Ok(res)
}
.boxed()
) -> Result<HostResponse, DynError> {
let res = self
.executor
.handle_request(client_id, req, updates)
.await?;
Ok(res)
}

fn executor(&mut self) -> &mut Self::ContractExecutor {
Expand Down Expand Up @@ -484,7 +472,6 @@ pub mod test {
pub(super) mod in_memory {
use crate::client_events::ClientId;
use freenet_stdlib::client_api::{ClientError, ClientRequest, HostResponse};
use futures::{future::BoxFuture, FutureExt};
use tokio::sync::mpsc::UnboundedSender;

use super::{
Expand Down Expand Up @@ -520,30 +507,27 @@ pub(super) mod in_memory {
type Builder = String;
type ContractExecutor = Executor<MockRuntime>;

fn build(
async fn build(
channel: ContractHandlerChannel<ContractHandlerHalve>,
executor_request_sender: ExecutorToEventLoopChannel<ExecutorHalve>,
identifier: Self::Builder,
) -> BoxFuture<'static, Result<Self, DynError>>
) -> Result<Self, DynError>
where
Self: Sized + 'static,
{
async move {
Ok(MemoryContractHandler::new(channel, executor_request_sender, &identifier).await)
}
.boxed()
Ok(MemoryContractHandler::new(channel, executor_request_sender, &identifier).await)
}

fn channel(&mut self) -> &mut ContractHandlerChannel<ContractHandlerHalve> {
&mut self.channel
}

fn handle_request<'a, 's: 'a>(
async fn handle_request<'a, 's: 'a>(
&'s mut self,
_req: ClientRequest<'a>,
_client_id: ClientId,
_updates: Option<UnboundedSender<Result<HostResponse, ClientError>>>,
) -> BoxFuture<'static, Result<HostResponse, DynError>> {
) -> Result<HostResponse, DynError> {
unreachable!()
}

Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/node/network_bridge.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Types and definitions to handle all inter-peer communication.

use std::future::Future;
use std::ops::{Deref, DerefMut};

use either::Either;
Expand All @@ -22,13 +23,13 @@ pub(crate) type ConnResult<T> = std::result::Result<T, ConnectionError>;

/// Allows handling of connections to the network as well as sending messages
/// to other peers in the network with whom connection has been established.
#[async_trait::async_trait]
pub(crate) trait NetworkBridge: Send + Sync {
async fn add_connection(&mut self, peer: PeerId) -> ConnResult<()>;
fn add_connection(&mut self, peer: PeerId) -> impl Future<Output = ConnResult<()>> + Send;

async fn drop_connection(&mut self, peer: &PeerId) -> ConnResult<()>;
fn drop_connection(&mut self, peer: &PeerId) -> impl Future<Output = ConnResult<()>> + Send;

async fn send(&self, target: &PeerId, msg: NetMessage) -> ConnResult<()>;
fn send(&self, target: &PeerId, msg: NetMessage)
-> impl Future<Output = ConnResult<()>> + Send;
}

#[derive(Debug, thiserror::Error, Serialize, Deserialize)]
Expand Down
23 changes: 9 additions & 14 deletions crates/core/src/node/network_bridge/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{
};

use crossbeam::channel::{self, Receiver, Sender};
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::OnceCell;
use rand::{prelude::StdRng, seq::SliceRandom, Rng, SeedableRng};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -61,7 +60,6 @@ impl MemoryConnManager {
}
}

#[async_trait::async_trait]
impl NetworkBridge for MemoryConnManager {
async fn send(&self, target: &PeerId, msg: NetMessage) -> super::ConnResult<()> {
self.log_register
Expand All @@ -83,19 +81,16 @@ impl NetworkBridge for MemoryConnManager {
}

impl NetworkBridgeExt for MemoryConnManager {
fn recv(&mut self) -> BoxFuture<'_, Result<NetMessage, ConnectionError>> {
async {
loop {
let mut queue = self.msg_queue.lock().await;
let Some(msg) = queue.pop() else {
std::mem::drop(queue);
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
};
return Ok(msg);
}
async fn recv(&mut self) -> Result<NetMessage, ConnectionError> {
loop {
let mut queue = self.msg_queue.lock().await;
let Some(msg) = queue.pop() else {
std::mem::drop(queue);
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
};
return Ok(msg);
}
.boxed()
}
}

Expand Down
Loading
Loading