Skip to content

Commit

Permalink
[dag] Integrate dag fetcher with RB node handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 14, 2023
1 parent 7a0a66f commit ec7a202
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 20 deletions.
4 changes: 2 additions & 2 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ pub fn bootstrap_dag(
time_service,
storage.clone(),
order_rule,
fetch_requester,
fetch_requester.clone(),
);
let rb_handler =
NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone());
NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone(), fetch_requester);
let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone());

let dag_handler = NetworkHandler::new(
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
use crate::{
dag::{
dag_store::Dag,
types::{CertificateAckState, CertifiedNode, Node, NodeCertificate, SignatureBuilder},
types::{CertificateAckState, CertifiedNode, Node, NodeCertificate, SignatureBuilder}, dag_fetcher::TFetchRequester,
},
state_replication::PayloadClient,
};
Expand Down
11 changes: 8 additions & 3 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,19 @@ impl<T> Stream for FetchWaiter<T> {
}
}

pub trait TFetchRequester: Send + Sync {
fn request_for_node(&self, node: Node) -> anyhow::Result<()>;
fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()>;
}

pub struct FetchRequester {
request_tx: Sender<LocalFetchRequest>,
node_waiter_tx: Sender<oneshot::Receiver<Node>>,
certified_node_waiter_tx: Sender<oneshot::Receiver<CertifiedNode>>,
}

impl FetchRequester {
pub fn request_for_node(&self, node: Node) -> anyhow::Result<()> {
impl TFetchRequester for FetchRequester {
fn request_for_node(&self, node: Node) -> anyhow::Result<()> {
let (res_tx, res_rx) = oneshot::channel();
let fetch_req = LocalFetchRequest::Node(node, res_tx);
self.request_tx
Expand All @@ -70,7 +75,7 @@ impl FetchRequester {
Ok(())
}

pub fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> {
fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> {
let (res_tx, res_rx) = oneshot::channel();
let fetch_req = LocalFetchRequest::CertifiedNode(node, res_tx);
self.request_tx.try_send(fetch_req).map_err(|e| {
Expand Down
29 changes: 19 additions & 10 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{storage::DAGStorage, NodeId};
use super::{dag_fetcher::TFetchRequester, storage::DAGStorage, NodeId};
use crate::dag::{
dag_network::RpcHandler,
dag_store::Dag,
Expand All @@ -23,6 +23,8 @@ pub enum NodeBroadcastHandleError {
MissingParents,
#[error("parents do not meet quorum voting power")]
NotEnoughParents,
#[error("invalid round number")]
InvalidRound,
}

pub(crate) struct NodeBroadcastHandler {
Expand All @@ -31,6 +33,7 @@ pub(crate) struct NodeBroadcastHandler {
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
fetch_requester: Arc<dyn TFetchRequester>,
}

impl NodeBroadcastHandler {
Expand All @@ -39,6 +42,7 @@ impl NodeBroadcastHandler {
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
fetch_requester: Arc<dyn TFetchRequester>,
) -> Self {
let epoch = epoch_state.epoch;
let votes_by_round_peer = read_votes_from_storage(&storage, epoch);
Expand All @@ -49,6 +53,7 @@ impl NodeBroadcastHandler {
signer,
epoch_state,
storage,
fetch_requester,
}
}

Expand All @@ -67,22 +72,24 @@ impl NodeBroadcastHandler {
self.storage.delete_votes(to_delete)
}

fn validate(&self, node: &Node) -> anyhow::Result<()> {
fn validate(&self, node: Node) -> anyhow::Result<Node> {
let current_round = node.metadata().round();

// round 0 is a special case and does not require any parents
if current_round == 0 {
return Ok(());
bail!(NodeBroadcastHandleError::InvalidRound);
}

let prev_round = current_round - 1;

let dag_reader = self.dag.read();
// check if the parent round is missing in the DAG
ensure!(
prev_round >= dag_reader.lowest_round(),
NodeBroadcastHandleError::MissingParents
);
if prev_round >= dag_reader.lowest_round() {
if let Err(err) = self.fetch_requester.request_for_node(node) {
error!("request to fetch failed: {}", err);
}
bail!(NodeBroadcastHandleError::MissingParents);
}

// check which parents are missing in the DAG
let missing_parents: Vec<NodeCertificate> = node
Expand All @@ -99,11 +106,13 @@ impl NodeBroadcastHandler {
.all(|parent| { parent.verify(&self.epoch_state.verifier).is_ok() }),
NodeBroadcastHandleError::InvalidParent
);
// TODO: notify dag fetcher to fetch missing node and drop this node
if let Err(err) = self.fetch_requester.request_for_node(node) {
error!("request to fetch failed: {}", err);
}
bail!(NodeBroadcastHandleError::MissingParents);
}

Ok(())
Ok(node)
}
}

Expand Down Expand Up @@ -137,7 +146,7 @@ impl RpcHandler for NodeBroadcastHandler {
type Response = Vote;

fn process(&mut self, node: Self::Request) -> anyhow::Result<Self::Response> {
self.validate(&node)?;
let node = self.validate(node)?;

let votes_by_peer = self
.votes_by_round_peer
Expand Down
39 changes: 35 additions & 4 deletions consensus/src/dag/tests/rb_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::dag::{
dag_fetcher::TFetchRequester,
dag_store::Dag,
rb_handler::{NodeBroadcastHandleError, NodeBroadcastHandler},
storage::DAGStorage,
Expand All @@ -17,6 +18,18 @@ use aptos_types::{
use claims::{assert_ok, assert_ok_eq};
use std::{collections::BTreeMap, sync::Arc};

struct MockFetchRequester {}

impl TFetchRequester for MockFetchRequester {
fn request_for_node(&self, node: crate::dag::Node) -> anyhow::Result<()> {
Ok(())
}

fn request_for_certified_node(&self, node: crate::dag::CertifiedNode) -> anyhow::Result<()> {
Ok(())
}
}

#[tokio::test]
async fn test_node_broadcast_receiver_succeed() {
let (signers, validator_verifier) = random_validator_verifier(4, None, false);
Expand All @@ -32,7 +45,13 @@ async fn test_node_broadcast_receiver_succeed() {

assert_ne!(wellformed_node.digest(), equivocating_node.digest());

let mut rb_receiver = NodeBroadcastHandler::new(dag, signers[3].clone(), epoch_state, storage);
let mut rb_receiver = NodeBroadcastHandler::new(
dag,
signers[3].clone(),
epoch_state,
storage,
Arc::new(MockFetchRequester {}),
);

let expected_result = Vote::new(
wellformed_node.metadata().clone(),
Expand All @@ -58,7 +77,13 @@ async fn test_node_broadcast_receiver_failure() {
let storage = Arc::new(MockStorage::new());
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone())));

NodeBroadcastHandler::new(dag, signer.clone(), epoch_state.clone(), storage)
NodeBroadcastHandler::new(
dag,
signer.clone(),
epoch_state.clone(),
storage,
Arc::new(MockFetchRequester {}),
)
})
.collect();

Expand Down Expand Up @@ -130,6 +155,7 @@ fn test_node_broadcast_receiver_storage() {
signers[3].clone(),
epoch_state.clone(),
storage.clone(),
Arc::new(MockFetchRequester {}),
);
let sig = rb_receiver.process(node).expect("must succeed");

Expand All @@ -138,8 +164,13 @@ fn test_node_broadcast_receiver_storage() {
sig
)],);

let mut rb_receiver =
NodeBroadcastHandler::new(dag, signers[3].clone(), epoch_state, storage.clone());
let mut rb_receiver = NodeBroadcastHandler::new(
dag,
signers[3].clone(),
epoch_state,
storage.clone(),
Arc::new(MockFetchRequester {}),
);
assert_ok!(rb_receiver.gc_before_round(2));
assert_eq!(storage.get_votes().unwrap().len(), 0);
}

0 comments on commit ec7a202

Please sign in to comment.