From 9df99382a69ef08781337e3a626d9d44e04dcb88 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Tue, 1 Aug 2023 20:11:52 -0700 Subject: [PATCH] [dag] Integrate Dag Fetcher with Dag Driver --- consensus/src/dag/dag_driver.rs | 11 +++- consensus/src/dag/dag_fetcher.rs | 64 ++++++++++++++++++--- consensus/src/dag/dag_handler.rs | 17 +++++- consensus/src/dag/tests/dag_driver_tests.rs | 9 ++- 4 files changed, 88 insertions(+), 13 deletions(-) diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 6953f85aa09c91..9207add142d64e 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::{ + dag_fetcher::FetchRequester, order_rule::OrderRule, storage::DAGStorage, types::{CertifiedAck, DAGMessage}, @@ -14,9 +15,10 @@ use crate::{ }, state_replication::PayloadClient, }; -use anyhow::bail; +use anyhow::{bail, Ok}; use aptos_consensus_types::common::{Author, Payload}; use aptos_infallible::RwLock; +use aptos_logger::error; use aptos_reliable_broadcast::ReliableBroadcast; use aptos_time_service::{TimeService, TimeServiceTrait}; use aptos_types::{block_info::Round, epoch_state::EpochState}; @@ -45,6 +47,7 @@ pub(crate) struct DagDriver { rb_abort_handle: Option, storage: Arc, order_rule: OrderRule, + fetch_requester: Arc, } impl DagDriver { @@ -58,6 +61,7 @@ impl DagDriver { time_service: TimeService, storage: Arc, order_rule: OrderRule, + fetch_requester: Arc, ) -> Self { // TODO: rebroadcast nodes after recovery Self { @@ -71,6 +75,7 @@ impl DagDriver { rb_abort_handle: None, storage, order_rule, + fetch_requester, } } @@ -88,7 +93,9 @@ impl DagDriver { let round = node.metadata().round(); if !dag_writer.all_exists(node.parents_metadata()) { - // TODO(ibalajiarun): implement fetching logic. + if let Err(err) = self.fetch_requester.request_for_certified_node(node) { + error!("request to fetch failed: {}", err); + } bail!(DagDriverError::MissingParents); } diff --git a/consensus/src/dag/dag_fetcher.rs b/consensus/src/dag/dag_fetcher.rs index f70bacc89b4cfa..db3da1f9630cfe 100644 --- a/consensus/src/dag/dag_fetcher.rs +++ b/consensus/src/dag/dag_fetcher.rs @@ -13,14 +13,58 @@ use aptos_infallible::RwLock; use aptos_logger::error; use aptos_time_service::TimeService; use aptos_types::epoch_state::EpochState; -use futures::StreamExt; +use futures::{stream::FuturesUnordered, StreamExt}; +use tokio::sync::{oneshot, mpsc::{Sender, Receiver}}; use std::{collections::HashMap, sync::Arc, time::Duration}; use thiserror::Error as ThisError; -use tokio::sync::{ - mpsc::{Receiver, Sender}, - oneshot, -}; +pub struct FetchRequester { + request_tx: Sender, + node_rx_futures: FuturesUnordered>, + certified_node_rx_futures: FuturesUnordered>, +} + +impl FetchRequester { + pub fn new(request_tx: Sender) -> Self { + Self { + request_tx, + node_rx_futures: FuturesUnordered::new(), + certified_node_rx_futures: FuturesUnordered::new(), + } + } + + pub fn request_for_node(&self, node: Node) -> anyhow::Result<()> { + let (res_tx, res_rx) = oneshot::channel(); + let fetch_req = LocalFetchRequest::Node(node, res_tx); + self.request_tx + .try_send(fetch_req) + .map_err(|e| anyhow::anyhow!("unable to send fetch request to channel: {}", e))?; + self.node_rx_futures.push(res_rx); + Ok(()) + } + + pub fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> { + let (res_tx, res_rx) = oneshot::channel(); + let fetch_req = LocalFetchRequest::CertifiedNode(node, res_tx); + self.request_tx + .try_send(fetch_req) + .map_err(|e| anyhow::anyhow!("unable to send fetch request to channel: {}", e))?; + self.certified_node_rx_futures.push(res_rx); + Ok(()) + } + + pub async fn next_ready_node(&mut self) -> Option> { + self.node_rx_futures.next().await + } + + pub async fn next_ready_certified_node( + &mut self, + ) -> Option> { + self.certified_node_rx_futures.next().await + } +} + +#[derive(Debug)] pub enum LocalFetchRequest { Node(Node, oneshot::Sender), CertifiedNode(CertifiedNode, oneshot::Sender), @@ -55,7 +99,7 @@ impl LocalFetchRequest { } } -struct DagFetcher { +pub struct DagFetcher { epoch_state: Arc, network: Arc, dag: Arc>, @@ -69,7 +113,7 @@ impl DagFetcher { network: Arc, dag: Arc>, time_service: TimeService, - ) -> (Self, Sender) { + ) -> (Self, FetchRequester) { let (request_tx, request_rx) = tokio::sync::mpsc::channel(16); ( Self { @@ -79,7 +123,11 @@ impl DagFetcher { request_rx, time_service, }, - request_tx, + FetchRequester { + request_tx, + node_rx_futures: FuturesUnordered::new(), + certified_node_rx_futures: FuturesUnordered::new(), + }, ) } diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 54f06b06c6fb89..ff047eef60abf4 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -1,7 +1,12 @@ // Copyright © Aptos Foundation use super::{ - order_rule::OrderRule, dag_driver::DagDriver, dag_fetcher::FetchRequestHandler, storage::DAGStorage, dag_network::DAGNetworkSender, types::TDAGMessage, + dag_driver::DagDriver, + dag_fetcher::{DagFetcher, FetchRequestHandler}, + dag_network::DAGNetworkSender, + order_rule::OrderRule, + storage::DAGStorage, + types::TDAGMessage, }; use crate::{ dag::{ @@ -41,7 +46,7 @@ impl NetworkHandler { epoch_state: Arc, storage: Arc, payload_client: Arc, - _dag_network_sender: Arc, + dag_network_sender: Arc, rb_network_sender: Arc>, time_service: TimeService, order_rule: OrderRule, @@ -52,6 +57,13 @@ impl NetworkHandler { ExponentialBackoff::from_millis(10), time_service.clone(), )); + // TODO: wire dag fetcher + let (_dag_fetcher, fetch_requester) = DagFetcher::new( + epoch_state.clone(), + dag_network_sender, + dag.clone(), + time_service.clone(), + ); Self { dag_rpc_rx, node_receiver: NodeBroadcastHandler::new( @@ -70,6 +82,7 @@ impl NetworkHandler { time_service, storage, order_rule, + Arc::new(fetch_requester), ), epoch_state: epoch_state.clone(), fetch_receiver: FetchRequestHandler::new(dag, epoch_state), diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index dba7b4e3ee8a46..4c7e11259cbbfd 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -2,13 +2,15 @@ use crate::{ dag::{ + anchor_election::RoundRobinAnchorElection, dag_driver::{DagDriver, DagDriverError}, + dag_fetcher::FetchRequester, dag_network::{DAGNetworkSender, RpcWithFallback}, dag_store::Dag, order_rule::OrderRule, tests::{dag_test::MockStorage, helpers::new_certified_node}, types::{CertifiedAck, DAGMessage}, - RpcHandler, anchor_election::RoundRobinAnchorElection, + RpcHandler, }, test_utils::MockPayloadManager, }; @@ -90,6 +92,10 @@ fn test_certified_node_handler() { Box::new(RoundRobinAnchorElection::new(validators)), ordered_nodes_sender, ); + + let (request_tx, _) = tokio::sync::mpsc::channel(10); + let fetch_requester = Arc::new(FetchRequester::new(request_tx)); + let mut driver = DagDriver::new( signers[0].author(), epoch_state, @@ -100,6 +106,7 @@ fn test_certified_node_handler() { time_service, storage, order_rule, + fetch_requester, ); // expect an ack for a valid message