diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index 7b96f4384e3153..4efc061a5f3cbd 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -16,5 +16,5 @@ mod storage; mod tests; mod types; -pub use dag_network::RpcHandler; -pub use types::{CertifiedNode, DAGNetworkMessage, Node, NodeId, Vote}; +pub use dag_network::{RpcHandler, DAGNetworkSender, RpcWithFallback}; +pub use types::{CertifiedNode, DAGNetworkMessage, Node, NodeId, Vote, DAGMessage}; diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 93f59c1c1111cd..6fc317af71f3e6 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -5,7 +5,7 @@ use crate::{ block_storage::tracing::{observe_block, BlockStage}, counters, - dag::DAGNetworkMessage, + dag::{DAGMessage, DAGNetworkMessage, DAGNetworkSender, RpcWithFallback}, logging::LogEvent, monitor, network_interface::{ConsensusMsg, ConsensusNetworkClient}, @@ -29,10 +29,12 @@ use aptos_network::{ protocols::{network::Event, rpc::error::RpcError}, ProtocolId, }; +use aptos_reliable_broadcast::{RBMessage, RBNetworkSender}; use aptos_types::{ account_address::AccountAddress, epoch_change::EpochChangeProof, ledger_info::LedgerInfoWithSignatures, validator_verifier::ValidatorVerifier, }; +use async_trait::async_trait; use bytes::Bytes; use fail::fail_point; use futures::{ @@ -43,6 +45,7 @@ use futures::{ use serde::{de::DeserializeOwned, Serialize}; use std::{ mem::{discriminant, Discriminant}, + sync::Arc, time::Duration, }; @@ -404,6 +407,79 @@ impl QuorumStoreSender for NetworkSender { } } +// TODO: this can be improved +#[derive(Clone)] +pub struct DAGNetworkSenderImpl { + sender: Arc, + time_service: aptos_time_service::TimeService, +} + +impl DAGNetworkSenderImpl { + pub fn new(sender: Arc) -> Self { + Self { + sender, + time_service: aptos_time_service::TimeService::real(), + } + } +} + +#[async_trait] +impl DAGNetworkSender for DAGNetworkSenderImpl { + async fn send_rpc( + &self, + receiver: Author, + message: DAGMessage, + timeout: Duration, + ) -> anyhow::Result { + self.sender + .consensus_network_client + .send_rpc(receiver, message.into_network_message(), timeout) + .await + .map_err(|e| anyhow!("invalid rpc response: {}", e)) + .and_then(|msg| TConsensusMsg::from_network_message(msg)) + } + + /// Given a list of potential responders, sending rpc to get response from any of them and could + /// fallback to more in case of failures. + async fn send_rpc_with_fallbacks( + &self, + responders: Vec, + message: DAGMessage, + retry_interval: Duration, + rpc_timeout: Duration, + ) -> RpcWithFallback { + let sender = Arc::new(self.clone()); + RpcWithFallback::new( + responders, + message, + retry_interval, + rpc_timeout, + sender, + self.time_service.clone(), + ) + } +} + +#[async_trait] +impl RBNetworkSender for DAGNetworkSenderImpl +where + M: RBMessage + TConsensusMsg + 'static, +{ + async fn send_rb_rpc( + &self, + receiver: Author, + message: M, + timeout: Duration, + ) -> anyhow::Result { + self.sender + .consensus_network_client + .send_rpc(receiver, message.into_network_message(), timeout) + .await + .map_err(|e| anyhow!("invalid rpc response: {}", e)) + .and_then(|msg| TConsensusMsg::from_network_message(msg)) + } +} + pub struct NetworkTask { consensus_messages_tx: aptos_channel::Sender< (AccountAddress, Discriminant),