From 12fdde3284602c16ed896309e7d9af0df1b75aaf Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 15 Apr 2024 15:20:27 +0800 Subject: [PATCH] Remove `async-trait` and most of `BoxedFuture` (#1042) --- Cargo.lock | 1 - crates/core/Cargo.toml | 1 - crates/core/src/contract/executor.rs | 25 ++--- .../src/contract/executor/mock_runtime.rs | 5 +- crates/core/src/contract/executor/runtime.rs | 1 - crates/core/src/contract/handler.rs | 76 ++++++--------- crates/core/src/node/network_bridge.rs | 9 +- .../core/src/node/network_bridge/in_memory.rs | 23 ++--- .../src/node/network_bridge/inter_process.rs | 21 ++-- .../src/node/network_bridge/p2p_protoc.rs | 1 - crates/core/src/node/testing_impl.rs | 4 +- crates/core/src/operations.rs | 4 +- crates/core/src/operations/connect.rs | 96 +++++++++---------- crates/core/src/operations/get.rs | 61 ++++++------ crates/core/src/operations/put.rs | 59 ++++++------ crates/core/src/operations/subscribe.rs | 64 ++++++------- crates/core/src/operations/update.rs | 67 ++++++------- 17 files changed, 233 insertions(+), 285 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 41641ab54..f0e1d56aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1693,7 +1693,6 @@ dependencies = [ "anyhow", "arbitrary", "arc-swap", - "async-trait", "asynchronous-codec 0.7.0", "axum 0.7.5", "bincode", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 4fd2c10fe..feb6def1e 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -17,7 +17,6 @@ anyhow = "1" arc-swap = "1.7" asynchronous-codec = "0.7" aes-gcm = "0.10.3" -async-trait = "0.1" axum = { default-features = false, features = ["http1", "matched-path", "query", "tower-log", "ws", "json"], workspace = true } bincode = "1.3.3" blake3 = { workspace = true } diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index 3f9f88455..ab617c544 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::fmt::Display; +use std::future::Future; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -314,7 +315,6 @@ mod sealed { impl ChannelHalve for Callback {} } -#[async_trait::async_trait] trait ComposeNetworkMessage where Self: Sized, @@ -322,7 +322,10 @@ where { fn initiate_op(self, op_manager: &OpManager) -> Op; - async fn resume_op(op: Op, op_manager: &OpManager) -> Result<(), OpError>; + fn resume_op( + op: Op, + op_manager: &OpManager, + ) -> impl Future> + Send; } #[allow(unused)] @@ -331,7 +334,6 @@ struct GetContract { fetch_contract: bool, } -#[async_trait::async_trait] impl ComposeNetworkMessage for GetContract { fn initiate_op(self, _op_manager: &OpManager) -> operations::get::GetOp { operations::get::start_op(self.key, self.fetch_contract) @@ -347,7 +349,6 @@ struct SubscribeContract { key: ContractKey, } -#[async_trait::async_trait] impl ComposeNetworkMessage for SubscribeContract { fn initiate_op(self, _op_manager: &OpManager) -> operations::subscribe::SubscribeOp { operations::subscribe::start_op(self.key) @@ -368,7 +369,6 @@ struct PutContract { related_contracts: RelatedContracts<'static>, } -#[async_trait::async_trait] impl ComposeNetworkMessage for PutContract { fn initiate_op(self, op_manager: &OpManager) -> operations::put::PutOp { let PutContract { @@ -395,7 +395,6 @@ struct UpdateContract { new_state: WrappedState, } -#[async_trait::async_trait] impl ComposeNetworkMessage for UpdateContract { fn initiate_op(self, _op_manager: &OpManager) -> operations::update::UpdateOp { let UpdateContract { key, new_state } = self; @@ -411,23 +410,25 @@ impl ComposeNetworkMessage for UpdateContract { } } -#[async_trait::async_trait] pub(crate) trait ContractExecutor: Send + 'static { - async fn fetch_contract( + fn fetch_contract( &mut self, key: ContractKey, fetch_contract: bool, - ) -> Result<(WrappedState, Option), ExecutorError>; + ) -> impl Future), ExecutorError>> + Send; - async fn store_contract(&mut self, contract: ContractContainer) -> Result<(), ExecutorError>; + fn store_contract( + &mut self, + contract: ContractContainer, + ) -> impl Future> + Send; - async fn upsert_contract_state( + fn upsert_contract_state( &mut self, key: ContractKey, update: Either>, related_contracts: RelatedContracts<'static>, code: Option, - ) -> Result; + ) -> impl Future> + Send; } /// A WASM executor which will run any contracts, delegates, etc. registered. diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index 8f7671e02..7173214be 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -40,7 +40,6 @@ impl Executor { } } -#[async_trait::async_trait] impl ContractExecutor for Executor { async fn fetch_contract( &mut self, @@ -99,7 +98,7 @@ impl ContractExecutor for Executor { .store(key, incoming_state.clone(), contract.params().into_owned()) .await .map_err(ExecutorError::other)?; - return Ok(incoming_state); + Ok(incoming_state) } (Either::Left(incoming_state), None) => { // update case @@ -108,7 +107,7 @@ impl ContractExecutor for Executor { .update(&key, incoming_state.clone()) .await .map_err(ExecutorError::other)?; - return Ok(incoming_state); + Ok(incoming_state) } (update, contract) => unreachable!("{update:?}, {contract:?}"), } diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index e559268a0..2ca0b03bc 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -1,6 +1,5 @@ use super::*; -#[async_trait::async_trait] impl ContractExecutor for Executor { async fn fetch_contract( &mut self, diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index a0143ff31..b49ce1fbe 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -1,11 +1,11 @@ use std::collections::BTreeMap; +use std::future::Future; use std::hash::Hash; use std::sync::atomic::{AtomicU64, Ordering::SeqCst}; use std::time::Duration; use freenet_stdlib::client_api::{ClientError, ClientRequest, HostResponse}; use freenet_stdlib::prelude::*; -use futures::{future::BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; @@ -59,7 +59,7 @@ pub(crate) trait ContractHandler { contract_handler_channel: ContractHandlerChannel, executor_request_sender: ExecutorToEventLoopChannel, builder: Self::Builder, - ) -> BoxFuture<'static, Result> + ) -> impl Future> + Send where Self: Sized + 'static; @@ -72,7 +72,7 @@ pub(crate) trait ContractHandler { req: ClientRequest<'a>, client_id: ClientId, updates: Option>>, - ) -> BoxFuture<'a, Result>; + ) -> impl Future> + Send + 'a; fn executor(&mut self) -> &mut Self::ContractExecutor; } @@ -86,39 +86,33 @@ impl ContractHandler for NetworkContractHandler { type Builder = PeerCliConfig; type ContractExecutor = Executor; - fn build( + async fn build( channel: ContractHandlerChannel, executor_request_sender: ExecutorToEventLoopChannel, config: Self::Builder, - ) -> BoxFuture<'static, Result> + ) -> Result where Self: Sized + 'static, { - async { - let executor = Executor::from_config(config, Some(executor_request_sender)).await?; - Ok(Self { executor, channel }) - } - .boxed() + let executor = Executor::from_config(config, Some(executor_request_sender)).await?; + Ok(Self { executor, channel }) } fn channel(&mut self) -> &mut ContractHandlerChannel { &mut self.channel } - fn handle_request<'a, 's: 'a>( + async fn handle_request<'a, 's: 'a>( &'s mut self, req: ClientRequest<'a>, client_id: ClientId, updates: Option>>, - ) -> BoxFuture<'a, Result> { - async move { - let res = self - .executor - .handle_request(client_id, req, updates) - .await?; - Ok(res) - } - .boxed() + ) -> Result { + let res = self + .executor + .handle_request(client_id, req, updates) + .await?; + Ok(res) } fn executor(&mut self) -> &mut Self::ContractExecutor { @@ -131,39 +125,33 @@ impl ContractHandler for NetworkContractHandler { type Builder = String; type ContractExecutor = Executor; - fn build( + async fn build( channel: ContractHandlerChannel, executor_request_sender: ExecutorToEventLoopChannel, identifier: Self::Builder, - ) -> BoxFuture<'static, Result> + ) -> Result where Self: Sized + 'static, { - async move { - let executor = Executor::new_mock(&identifier, executor_request_sender).await?; - Ok(Self { executor, channel }) - } - .boxed() + let executor = Executor::new_mock(&identifier, executor_request_sender).await?; + Ok(Self { executor, channel }) } fn channel(&mut self) -> &mut ContractHandlerChannel { &mut self.channel } - fn handle_request<'a, 's: 'a>( + async fn handle_request<'a, 's: 'a>( &'s mut self, req: ClientRequest<'a>, client_id: ClientId, updates: Option>>, - ) -> BoxFuture<'a, Result> { - async move { - let res = self - .executor - .handle_request(client_id, req, updates) - .await?; - Ok(res) - } - .boxed() + ) -> Result { + let res = self + .executor + .handle_request(client_id, req, updates) + .await?; + Ok(res) } fn executor(&mut self) -> &mut Self::ContractExecutor { @@ -484,7 +472,6 @@ pub mod test { pub(super) mod in_memory { use crate::client_events::ClientId; use freenet_stdlib::client_api::{ClientError, ClientRequest, HostResponse}; - use futures::{future::BoxFuture, FutureExt}; use tokio::sync::mpsc::UnboundedSender; use super::{ @@ -520,30 +507,27 @@ pub(super) mod in_memory { type Builder = String; type ContractExecutor = Executor; - fn build( + async fn build( channel: ContractHandlerChannel, executor_request_sender: ExecutorToEventLoopChannel, identifier: Self::Builder, - ) -> BoxFuture<'static, Result> + ) -> Result where Self: Sized + 'static, { - async move { - Ok(MemoryContractHandler::new(channel, executor_request_sender, &identifier).await) - } - .boxed() + Ok(MemoryContractHandler::new(channel, executor_request_sender, &identifier).await) } fn channel(&mut self) -> &mut ContractHandlerChannel { &mut self.channel } - fn handle_request<'a, 's: 'a>( + async fn handle_request<'a, 's: 'a>( &'s mut self, _req: ClientRequest<'a>, _client_id: ClientId, _updates: Option>>, - ) -> BoxFuture<'static, Result> { + ) -> Result { unreachable!() } diff --git a/crates/core/src/node/network_bridge.rs b/crates/core/src/node/network_bridge.rs index df83a953d..0d27ce18b 100644 --- a/crates/core/src/node/network_bridge.rs +++ b/crates/core/src/node/network_bridge.rs @@ -1,5 +1,6 @@ //! Types and definitions to handle all inter-peer communication. +use std::future::Future; use std::ops::{Deref, DerefMut}; use either::Either; @@ -22,13 +23,13 @@ pub(crate) type ConnResult = std::result::Result; /// Allows handling of connections to the network as well as sending messages /// to other peers in the network with whom connection has been established. -#[async_trait::async_trait] pub(crate) trait NetworkBridge: Send + Sync { - async fn add_connection(&mut self, peer: PeerId) -> ConnResult<()>; + fn add_connection(&mut self, peer: PeerId) -> impl Future> + Send; - async fn drop_connection(&mut self, peer: &PeerId) -> ConnResult<()>; + fn drop_connection(&mut self, peer: &PeerId) -> impl Future> + Send; - async fn send(&self, target: &PeerId, msg: NetMessage) -> ConnResult<()>; + fn send(&self, target: &PeerId, msg: NetMessage) + -> impl Future> + Send; } #[derive(Debug, thiserror::Error, Serialize, Deserialize)] diff --git a/crates/core/src/node/network_bridge/in_memory.rs b/crates/core/src/node/network_bridge/in_memory.rs index 482e91f5a..90405dcd8 100644 --- a/crates/core/src/node/network_bridge/in_memory.rs +++ b/crates/core/src/node/network_bridge/in_memory.rs @@ -7,7 +7,6 @@ use std::{ }; use crossbeam::channel::{self, Receiver, Sender}; -use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::OnceCell; use rand::{prelude::StdRng, seq::SliceRandom, Rng, SeedableRng}; use tokio::sync::Mutex; @@ -61,7 +60,6 @@ impl MemoryConnManager { } } -#[async_trait::async_trait] impl NetworkBridge for MemoryConnManager { async fn send(&self, target: &PeerId, msg: NetMessage) -> super::ConnResult<()> { self.log_register @@ -83,19 +81,16 @@ impl NetworkBridge for MemoryConnManager { } impl NetworkBridgeExt for MemoryConnManager { - fn recv(&mut self) -> BoxFuture<'_, Result> { - async { - loop { - let mut queue = self.msg_queue.lock().await; - let Some(msg) = queue.pop() else { - std::mem::drop(queue); - tokio::time::sleep(Duration::from_millis(10)).await; - continue; - }; - return Ok(msg); - } + async fn recv(&mut self) -> Result { + loop { + let mut queue = self.msg_queue.lock().await; + let Some(msg) = queue.pop() else { + std::mem::drop(queue); + tokio::time::sleep(Duration::from_millis(10)).await; + continue; + }; + return Ok(msg); } - .boxed() } } diff --git a/crates/core/src/node/network_bridge/inter_process.rs b/crates/core/src/node/network_bridge/inter_process.rs index 44c1fa2ed..f50c0244b 100644 --- a/crates/core/src/node/network_bridge/inter_process.rs +++ b/crates/core/src/node/network_bridge/inter_process.rs @@ -1,6 +1,5 @@ use std::sync::{Arc, OnceLock}; -use futures::{future::BoxFuture, FutureExt}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufWriter, Stdout}, sync::{ @@ -71,21 +70,17 @@ impl InterProcessConnManager { } impl NetworkBridgeExt for InterProcessConnManager { - fn recv(&mut self) -> BoxFuture> { - async { - self.recv - .changed() - .await - .map_err(|_| ConnectionError::Timeout)?; - let data = &*self.recv.borrow(); - let deser = bincode::deserialize(data)?; - Ok(deser) - } - .boxed() + async fn recv(&mut self) -> Result { + self.recv + .changed() + .await + .map_err(|_| ConnectionError::Timeout)?; + let data = &*self.recv.borrow(); + let deser = bincode::deserialize(data)?; + Ok(deser) } } -#[async_trait::async_trait] impl NetworkBridge for InterProcessConnManager { async fn send(&self, target: &PeerId, msg: NetMessage) -> super::ConnResult<()> { tracing::debug!(%target, ?msg, "sending network message out"); diff --git a/crates/core/src/node/network_bridge/p2p_protoc.rs b/crates/core/src/node/network_bridge/p2p_protoc.rs index 970af203a..56efa2ff3 100644 --- a/crates/core/src/node/network_bridge/p2p_protoc.rs +++ b/crates/core/src/node/network_bridge/p2p_protoc.rs @@ -146,7 +146,6 @@ impl P2pBridge { } } -#[async_trait::async_trait] impl NetworkBridge for P2pBridge { async fn add_connection(&mut self, peer: FreenetPeerId) -> super::ConnResult<()> { if self.active_net_connections.contains_key(&peer) { diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index 42b104afb..0bb70aa21 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -9,7 +9,7 @@ use std::{ use either::Either; use freenet_stdlib::prelude::*; -use futures::{future::BoxFuture, Future}; +use futures::Future; use itertools::Itertools; use libp2p::{identity, PeerId as Libp2pPeerId}; use rand::{seq::SliceRandom, Rng}; @@ -965,7 +965,7 @@ use super::op_state_manager::OpManager; use crate::client_events::ClientEventsProxy; pub(super) trait NetworkBridgeExt: Clone + 'static { - fn recv(&mut self) -> BoxFuture>; + fn recv(&mut self) -> impl Future> + Send; } struct RunnerConfig diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index 273aac4f7..0e0dfe517 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -3,7 +3,7 @@ use std::backtrace::Backtrace as StdTrace; use std::{pin::Pin, time::Duration}; use freenet_stdlib::prelude::ContractKey; -use futures::{future::BoxFuture, Future}; +use futures::Future; use tokio::sync::mpsc::error::SendError; use crate::{ @@ -31,7 +31,7 @@ where fn load_or_init<'a>( op_manager: &'a OpManager, msg: &'a Self::Message, - ) -> BoxFuture<'a, Result, OpError>>; + ) -> impl Future, OpError>> + 'a; fn id(&self) -> &Transaction; diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 60444c062..fa9a380f9 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -1,7 +1,6 @@ //! Operation which seeks new connections in the ring. use freenet_stdlib::client_api::HostResponse; -use futures::future::BoxFuture; -use futures::{Future, FutureExt}; +use futures::Future; use std::pin::Pin; use std::{collections::HashSet, time::Duration}; @@ -61,60 +60,57 @@ impl Operation for ConnectOp { type Message = ConnectMsg; type Result = ConnectResult; - fn load_or_init<'a>( + async fn load_or_init<'a>( op_manager: &'a OpManager, msg: &'a Self::Message, - ) -> BoxFuture<'a, Result, OpError>> { - async move { - let sender; - let tx = *msg.id(); - match op_manager.pop(msg.id()) { - Ok(Some(OpEnum::Connect(connect_op))) => { - sender = msg.sender().cloned(); - // was an existing operation, the other peer messaged back - Ok(OpInitialization { - op: *connect_op, - sender, - }) - } - Ok(Some(op)) => { - let _ = op_manager.push(tx, op).await; - Err(OpError::OpNotPresent(tx)) - } - Ok(None) => { - let gateway = if !matches!( - msg, - ConnectMsg::Request { - msg: ConnectRequest::FindOptimalPeer { .. }, - .. - } - ) { - Some(Box::new(op_manager.ring.own_location())) - } else { - None - }; - // new request to join this node, initialize the state - Ok(OpInitialization { - op: Self { - id: tx, - state: Some(ConnectState::Initializing), - backoff: None, - gateway, - }, - sender: None, - }) - } - Err(err) => { - #[cfg(debug_assertions)] - if matches!(err, crate::node::OpNotAvailable::Completed) { - let target = msg.target(); - tracing::warn!(%tx, peer = ?target, "filtered"); + ) -> Result, OpError> { + let sender; + let tx = *msg.id(); + match op_manager.pop(msg.id()) { + Ok(Some(OpEnum::Connect(connect_op))) => { + sender = msg.sender().cloned(); + // was an existing operation, the other peer messaged back + Ok(OpInitialization { + op: *connect_op, + sender, + }) + } + Ok(Some(op)) => { + let _ = op_manager.push(tx, op).await; + Err(OpError::OpNotPresent(tx)) + } + Ok(None) => { + let gateway = if !matches!( + msg, + ConnectMsg::Request { + msg: ConnectRequest::FindOptimalPeer { .. }, + .. } - Err(err.into()) + ) { + Some(Box::new(op_manager.ring.own_location())) + } else { + None + }; + // new request to join this node, initialize the state + Ok(OpInitialization { + op: Self { + id: tx, + state: Some(ConnectState::Initializing), + backoff: None, + gateway, + }, + sender: None, + }) + } + Err(err) => { + #[cfg(debug_assertions)] + if matches!(err, crate::node::OpNotAvailable::Completed) { + let target = msg.target(); + tracing::warn!(%tx, peer = ?target, "filtered"); } + Err(err.into()) } } - .boxed() } fn id(&self) -> &Transaction { diff --git a/crates/core/src/operations/get.rs b/crates/core/src/operations/get.rs index 08d546470..412f99e13 100644 --- a/crates/core/src/operations/get.rs +++ b/crates/core/src/operations/get.rs @@ -3,8 +3,6 @@ use std::{future::Future, time::Instant}; use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use freenet_stdlib::prelude::*; -use futures::future::BoxFuture; -use futures::FutureExt; use crate::client_events::HostResult; use crate::{ @@ -265,41 +263,38 @@ impl Operation for GetOp { type Message = GetMsg; type Result = GetResult; - fn load_or_init<'a>( + async fn load_or_init<'a>( op_manager: &'a OpManager, msg: &'a Self::Message, - ) -> BoxFuture<'a, Result, OpError>> { - async move { - let mut sender: Option = None; - if let Some(peer_key_loc) = msg.sender().cloned() { - sender = Some(peer_key_loc.peer); - }; - let tx = *msg.id(); - match op_manager.pop(msg.id()) { - Ok(Some(OpEnum::Get(get_op))) => { - Ok(OpInitialization { op: get_op, sender }) - // was an existing operation, other peer messaged back - } - Ok(Some(op)) => { - let _ = op_manager.push(tx, op).await; - Err(OpError::OpNotPresent(tx)) - } - Ok(None) => { - // new request to get a value for a contract, initialize the machine - Ok(OpInitialization { - op: Self { - state: Some(GetState::ReceivedRequest), - id: tx, - result: None, - stats: None, // don't care about stats in target peers - }, - sender, - }) - } - Err(err) => Err(err.into()), + ) -> Result, OpError> { + let mut sender: Option = None; + if let Some(peer_key_loc) = msg.sender().cloned() { + sender = Some(peer_key_loc.peer); + }; + let tx = *msg.id(); + match op_manager.pop(msg.id()) { + Ok(Some(OpEnum::Get(get_op))) => { + Ok(OpInitialization { op: get_op, sender }) + // was an existing operation, other peer messaged back + } + Ok(Some(op)) => { + let _ = op_manager.push(tx, op).await; + Err(OpError::OpNotPresent(tx)) + } + Ok(None) => { + // new request to get a value for a contract, initialize the machine + Ok(OpInitialization { + op: Self { + state: Some(GetState::ReceivedRequest), + id: tx, + result: None, + stats: None, // don't care about stats in target peers + }, + sender, + }) } + Err(err) => Err(err.into()), } - .boxed() } fn id(&self) -> &Transaction { diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 21f87d852..bbb2fb2ba 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -11,8 +11,6 @@ use freenet_stdlib::{ client_api::{ErrorKind, HostResponse}, prelude::*, }; -use futures::future::BoxFuture; -use futures::FutureExt; use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::{ @@ -140,41 +138,38 @@ impl Operation for PutOp { type Message = PutMsg; type Result = PutResult; - fn load_or_init<'a>( + async fn load_or_init<'a>( op_manager: &'a OpManager, msg: &'a Self::Message, - ) -> BoxFuture<'a, Result, OpError>> { - async move { - let mut sender: Option = None; - if let Some(peer_key_loc) = msg.sender().cloned() { - sender = Some(peer_key_loc.peer); - }; + ) -> Result, OpError> { + let mut sender: Option = None; + if let Some(peer_key_loc) = msg.sender().cloned() { + sender = Some(peer_key_loc.peer); + }; - let tx = *msg.id(); - match op_manager.pop(msg.id()) { - Ok(Some(OpEnum::Put(put_op))) => { - // was an existing operation, the other peer messaged back - Ok(OpInitialization { op: put_op, sender }) - } - Ok(Some(op)) => { - let _ = op_manager.push(tx, op).await; - Err(OpError::OpNotPresent(tx)) - } - Ok(None) => { - // new request to put a new value for a contract, initialize the machine - Ok(OpInitialization { - op: Self { - state: Some(PutState::ReceivedRequest), - stats: None, // don't care for stats in the target peers - id: tx, - }, - sender, - }) - } - Err(err) => Err(err.into()), + let tx = *msg.id(); + match op_manager.pop(msg.id()) { + Ok(Some(OpEnum::Put(put_op))) => { + // was an existing operation, the other peer messaged back + Ok(OpInitialization { op: put_op, sender }) + } + Ok(Some(op)) => { + let _ = op_manager.push(tx, op).await; + Err(OpError::OpNotPresent(tx)) + } + Ok(None) => { + // new request to put a new value for a contract, initialize the machine + Ok(OpInitialization { + op: Self { + state: Some(PutState::ReceivedRequest), + stats: None, // don't care for stats in the target peers + id: tx, + }, + sender, + }) } + Err(err) => Err(err.into()), } - .boxed() } fn id(&self) -> &Transaction { diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index f5c0ace5b..8ed832ca7 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -5,7 +5,6 @@ use freenet_stdlib::{ client_api::{ErrorKind, HostResponse}, prelude::*, }; -use futures::{future::BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; @@ -140,43 +139,40 @@ impl Operation for SubscribeOp { type Message = SubscribeMsg; type Result = SubscribeResult; - fn load_or_init<'a>( + async fn load_or_init<'a>( op_manager: &'a OpManager, msg: &'a Self::Message, - ) -> BoxFuture<'a, Result, OpError>> { - async move { - let mut sender: Option = None; - if let Some(peer_key_loc) = msg.sender().cloned() { - sender = Some(peer_key_loc.peer); - }; - let id = *msg.id(); - - match op_manager.pop(msg.id()) { - Ok(Some(OpEnum::Subscribe(subscribe_op))) => { - // was an existing operation, the other peer messaged back - Ok(OpInitialization { - op: subscribe_op, - sender, - }) - } - Ok(Some(op)) => { - let _ = op_manager.push(id, op).await; - Err(OpError::OpNotPresent(id)) - } - Ok(None) => { - // new request to subcribe to a contract, initialize the machine - Ok(OpInitialization { - op: Self { - state: Some(SubscribeState::ReceivedRequest), - id, - }, - sender, - }) - } - Err(err) => Err(err.into()), + ) -> Result, OpError> { + let mut sender: Option = None; + if let Some(peer_key_loc) = msg.sender().cloned() { + sender = Some(peer_key_loc.peer); + }; + let id = *msg.id(); + + match op_manager.pop(msg.id()) { + Ok(Some(OpEnum::Subscribe(subscribe_op))) => { + // was an existing operation, the other peer messaged back + Ok(OpInitialization { + op: subscribe_op, + sender, + }) + } + Ok(Some(op)) => { + let _ = op_manager.push(id, op).await; + Err(OpError::OpNotPresent(id)) + } + Ok(None) => { + // new request to subcribe to a contract, initialize the machine + Ok(OpInitialization { + op: Self { + state: Some(SubscribeState::ReceivedRequest), + id, + }, + sender, + }) } + Err(err) => Err(err.into()), } - .boxed() } fn id(&self) -> &Transaction { diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index ce9648794..d52db37df 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -2,8 +2,6 @@ use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use std::time::Instant; // TODO: complete update logic in the network use freenet_stdlib::prelude::*; -use futures::future::BoxFuture; -use futures::FutureExt; use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::contract::ContractHandlerEvent; @@ -111,44 +109,41 @@ impl Operation for UpdateOp { type Message = UpdateMsg; type Result = UpdateResult; - fn load_or_init<'a>( + async fn load_or_init<'a>( op_manager: &'a crate::node::OpManager, msg: &'a Self::Message, - ) -> BoxFuture<'a, Result, OpError>> { - async move { - let mut sender: Option = None; - if let Some(peer_key_loc) = msg.sender().cloned() { - sender = Some(peer_key_loc.peer); - }; - let tx = *msg.id(); - match op_manager.pop(msg.id()) { - Ok(Some(OpEnum::Update(update_op))) => { - Ok(OpInitialization { - op: update_op, - sender, - }) - // was an existing operation, other peer messaged back - } - Ok(Some(op)) => { - let _ = op_manager.push(tx, op).await; - Err(OpError::OpNotPresent(tx)) - } - Ok(None) => { - // new request to get a value for a contract, initialize the machine - tracing::debug!(tx = %tx, sender = ?sender, "initializing new op"); - Ok(OpInitialization { - op: Self { - state: Some(UpdateState::ReceivedRequest), - id: tx, - stats: None, // don't care about stats in target peers - }, - sender, - }) - } - Err(err) => Err(err.into()), + ) -> Result, OpError> { + let mut sender: Option = None; + if let Some(peer_key_loc) = msg.sender().cloned() { + sender = Some(peer_key_loc.peer); + }; + let tx = *msg.id(); + match op_manager.pop(msg.id()) { + Ok(Some(OpEnum::Update(update_op))) => { + Ok(OpInitialization { + op: update_op, + sender, + }) + // was an existing operation, other peer messaged back + } + Ok(Some(op)) => { + let _ = op_manager.push(tx, op).await; + Err(OpError::OpNotPresent(tx)) + } + Ok(None) => { + // new request to get a value for a contract, initialize the machine + tracing::debug!(tx = %tx, sender = ?sender, "initializing new op"); + Ok(OpInitialization { + op: Self { + state: Some(UpdateState::ReceivedRequest), + id: tx, + stats: None, // don't care about stats in target peers + }, + sender, + }) } + Err(err) => Err(err.into()), } - .boxed() } fn id(&self) -> &crate::message::Transaction {