Skip to content

Commit

Permalink
[dag] Integrate dag fetcher with RB node handler
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 24, 2023
1 parent fad6e81 commit 644e1b1
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 61 deletions.
15 changes: 12 additions & 3 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ pub fn bootstrap_dag(
time_service.clone(),
));

let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone())));
let dag = Arc::new(RwLock::new(Dag::new(
epoch_state.clone(),
storage.clone(),
current_round,
)));

let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));
let order_rule = OrderRule::new(
Expand Down Expand Up @@ -86,10 +90,15 @@ pub fn bootstrap_dag(
time_service,
storage.clone(),
order_rule,
fetch_requester.clone(),
);
let rb_handler = NodeBroadcastHandler::new(
dag.clone(),
signer,
epoch_state.clone(),
storage.clone(),
fetch_requester,
);
let rb_handler =
NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone());
let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone());

let dag_handler = NetworkHandler::new(
Expand Down
1 change: 1 addition & 0 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::{
};
use crate::{
dag::{
dag_fetcher::TFetchRequester,
dag_store::Dag,
types::{CertificateAckState, CertifiedNode, Node, NodeCertificate, SignatureBuilder},
},
Expand Down
11 changes: 8 additions & 3 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,19 @@ impl<T> Stream for FetchWaiter<T> {
}
}

pub trait TFetchRequester: Send + Sync {
fn request_for_node(&self, node: Node) -> anyhow::Result<()>;
fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()>;
}

pub struct FetchRequester {
request_tx: Sender<LocalFetchRequest>,
node_waiter_tx: Sender<oneshot::Receiver<Node>>,
certified_node_waiter_tx: Sender<oneshot::Receiver<CertifiedNode>>,
}

impl FetchRequester {
pub fn request_for_node(&self, node: Node) -> anyhow::Result<()> {
impl TFetchRequester for FetchRequester {
fn request_for_node(&self, node: Node) -> anyhow::Result<()> {
let (res_tx, res_rx) = oneshot::channel();
let fetch_req = LocalFetchRequest::Node(node, res_tx);
self.request_tx
Expand All @@ -70,7 +75,7 @@ impl FetchRequester {
Ok(())
}

pub fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> {
fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> {
let (res_tx, res_rx) = oneshot::channel();
let fetch_req = LocalFetchRequest::CertifiedNode(node, res_tx);
self.request_tx.try_send(fetch_req).map_err(|e| {
Expand Down
12 changes: 9 additions & 3 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ pub struct Dag {
/// Map between peer id to vector index
author_to_index: HashMap<Author, usize>,
storage: Arc<dyn DAGStorage>,
initial_round: Round,
}

impl Dag {
pub fn new(epoch_state: Arc<EpochState>, storage: Arc<dyn DAGStorage>) -> Self {
pub fn new(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
initial_round: Round,
) -> Self {
let epoch = epoch_state.epoch;
let author_to_index = epoch_state.verifier.address_to_validator_index().clone();
let num_validators = author_to_index.len();
Expand Down Expand Up @@ -77,6 +82,7 @@ impl Dag {
nodes_by_round,
author_to_index,
storage,
initial_round,
}
}

Expand All @@ -85,15 +91,15 @@ impl Dag {
.nodes_by_round
.first_key_value()
.map(|(round, _)| round)
.unwrap_or(&0)
.unwrap_or(&self.initial_round)
}

pub fn highest_round(&self) -> Round {
*self
.nodes_by_round
.last_key_value()
.map(|(round, _)| round)
.unwrap_or(&0)
.unwrap_or(&self.initial_round)
}

pub fn add_node(&mut self, node: CertifiedNode) -> anyhow::Result<()> {
Expand Down
46 changes: 27 additions & 19 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{storage::DAGStorage, NodeId};
use super::{dag_fetcher::TFetchRequester, storage::DAGStorage, NodeId};
use crate::dag::{
dag_network::RpcHandler,
dag_store::Dag,
Expand All @@ -21,8 +21,8 @@ pub enum NodeBroadcastHandleError {
InvalidParent,
#[error("missing parents")]
MissingParents,
#[error("parents do not meet quorum voting power")]
NotEnoughParents,
#[error("stale round number")]
StaleRound(Round),
}

pub(crate) struct NodeBroadcastHandler {
Expand All @@ -31,6 +31,7 @@ pub(crate) struct NodeBroadcastHandler {
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
fetch_requester: Arc<dyn TFetchRequester>,
}

impl NodeBroadcastHandler {
Expand All @@ -39,6 +40,7 @@ impl NodeBroadcastHandler {
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
fetch_requester: Arc<dyn TFetchRequester>,
) -> Self {
let epoch = epoch_state.epoch;
let votes_by_round_peer = read_votes_from_storage(&storage, epoch);
Expand All @@ -49,6 +51,7 @@ impl NodeBroadcastHandler {
signer,
epoch_state,
storage,
fetch_requester,
}
}

Expand All @@ -67,21 +70,15 @@ impl NodeBroadcastHandler {
self.storage.delete_votes(to_delete)
}

fn validate(&self, node: &Node) -> anyhow::Result<()> {
fn validate(&self, node: Node) -> anyhow::Result<Node> {
let current_round = node.metadata().round();

// round 0 is a special case and does not require any parents
if current_round == 0 {
return Ok(());
}

let prev_round = current_round - 1;

let dag_reader = self.dag.read();
// check if the parent round is missing in the DAG
let lowest_round = dag_reader.lowest_round();

ensure!(
prev_round >= dag_reader.lowest_round(),
NodeBroadcastHandleError::MissingParents
current_round >= lowest_round,
NodeBroadcastHandleError::StaleRound(current_round)
);

// check which parents are missing in the DAG
Expand All @@ -91,19 +88,30 @@ impl NodeBroadcastHandler {
.filter(|parent| !dag_reader.exists(parent.metadata()))
.cloned()
.collect();
drop(dag_reader); // Drop the DAG store early as it is no longer required

if !missing_parents.is_empty() {
// For each missing parent, verify their signatures and voting power
// For each missing parent, verify their signatures and voting power.
// Otherwise, a malicious node can send bad nodes with fake parents
// and cause this peer to issue unnecessary fetch requests.
ensure!(
missing_parents
.iter()
.all(|parent| { parent.verify(&self.epoch_state.verifier).is_ok() }),
NodeBroadcastHandleError::InvalidParent
);
// TODO: notify dag fetcher to fetch missing node and drop this node
bail!(NodeBroadcastHandleError::MissingParents);

// Don't issue fetch requests for parents of the lowest round in the DAG
// because they are already GC'ed
if current_round > lowest_round {
if let Err(err) = self.fetch_requester.request_for_node(node) {
error!("request to fetch failed: {}", err);
}
bail!(NodeBroadcastHandleError::MissingParents);
}
}

Ok(())
Ok(node)
}
}

Expand Down Expand Up @@ -137,7 +145,7 @@ impl RpcHandler for NodeBroadcastHandler {
type Response = Vote;

fn process(&mut self, node: Self::Request) -> anyhow::Result<Self::Response> {
self.validate(&node)?;
let node = self.validate(node)?;

let votes_by_peer = self
.votes_by_round_peer
Expand Down
17 changes: 10 additions & 7 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ fn test_certified_node_handler() {
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone())));

let zeroth_round_node = new_certified_node(0, signers[0].author(), vec![]);
let dag = Arc::new(RwLock::new(Dag::new(
epoch_state.clone(),
storage.clone(),
1,
)));

let network_sender = Arc::new(MockNetworkSender {});
let rb = Arc::new(ReliableBroadcast::new(
Expand Down Expand Up @@ -115,13 +117,14 @@ fn test_certified_node_handler() {
fetch_requester,
);

let first_round_node = new_certified_node(1, signers[0].author(), vec![]);
// expect an ack for a valid message
assert_ok!(driver.process(zeroth_round_node.clone()));
assert_ok!(driver.process(first_round_node.clone()));
// expect an ack if the same message is sent again
assert_ok_eq!(driver.process(zeroth_round_node), CertifiedAck::new(1));
assert_ok_eq!(driver.process(first_round_node), CertifiedAck::new(1));

let parent_node = new_certified_node(0, signers[1].author(), vec![]);
let invalid_node = new_certified_node(1, signers[0].author(), vec![parent_node.certificate()]);
let parent_node = new_certified_node(1, signers[1].author(), vec![]);
let invalid_node = new_certified_node(2, signers[0].author(), vec![parent_node.certificate()]);
assert_eq!(
driver.process(invalid_node).unwrap_err().to_string(),
DagDriverError::MissingParents.to_string()
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/tests/dag_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fn setup() -> (Vec<ValidatorSigner>, Arc<EpochState>, Dag, Arc<MockStorage>) {
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Dag::new(epoch_state.clone(), storage.clone());
let dag = Dag::new(epoch_state.clone(), storage.clone(), 1);
(signers, epoch_state, dag, storage)
}

Expand Down Expand Up @@ -190,7 +190,7 @@ fn test_dag_recover_from_storage() {
assert!(dag.add_node(node).is_ok());
}
}
let new_dag = Dag::new(epoch_state.clone(), storage.clone());
let new_dag = Dag::new(epoch_state.clone(), storage.clone(), 1);

for metadata in &metadatas {
assert!(new_dag.exists(metadata));
Expand All @@ -201,7 +201,7 @@ fn test_dag_recover_from_storage() {
verifier: epoch_state.verifier.clone(),
});

let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone());
let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone(), 1);
assert!(storage.certified_node_data.lock().is_empty());
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/fetcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn test_dag_fetcher_receiver() {
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage)));
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage, 1)));

let mut fetcher = FetchRequestHandler::new(dag.clone(), epoch_state);

Expand Down
7 changes: 4 additions & 3 deletions consensus/src/dag/tests/order_rule_tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::helpers::new_certified_node;
use crate::{
dag::{
anchor_election::RoundRobinAnchorElection,
dag_store::Dag,
order_rule::OrderRule,
tests::{dag_test::MockStorage, helpers::new_certified_node},
tests::dag_test::MockStorage,
types::{NodeCertificate, NodeMetadata},
CertifiedNode,
},
Expand Down Expand Up @@ -153,7 +154,7 @@ proptest! {
epoch: 1,
verifier: validator_verifier,
});
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()));
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1);
for round_nodes in &nodes {
for node in round_nodes.iter().flatten() {
dag.add_node(node.clone()).unwrap();
Expand Down Expand Up @@ -231,7 +232,7 @@ fn test_order_rule_basic() {
epoch: 1,
verifier: validator_verifier,
});
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()));
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1);
for round_nodes in &nodes {
for node in round_nodes.iter().flatten() {
dag.add_node(node.clone()).unwrap();
Expand Down
Loading

0 comments on commit 644e1b1

Please sign in to comment.