-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dag] Integrate dag fetcher with RB node handler #9585
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
// Copyright © Aptos Foundation | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use super::{storage::DAGStorage, NodeId}; | ||
use super::{dag_fetcher::TFetchRequester, storage::DAGStorage, NodeId}; | ||
use crate::dag::{ | ||
dag_network::RpcHandler, | ||
dag_store::Dag, | ||
|
@@ -21,8 +21,8 @@ pub enum NodeBroadcastHandleError { | |
InvalidParent, | ||
#[error("missing parents")] | ||
MissingParents, | ||
#[error("parents do not meet quorum voting power")] | ||
NotEnoughParents, | ||
#[error("stale round number")] | ||
StaleRound(Round), | ||
} | ||
|
||
pub(crate) struct NodeBroadcastHandler { | ||
|
@@ -31,6 +31,7 @@ pub(crate) struct NodeBroadcastHandler { | |
signer: ValidatorSigner, | ||
epoch_state: Arc<EpochState>, | ||
storage: Arc<dyn DAGStorage>, | ||
fetch_requester: Arc<dyn TFetchRequester>, | ||
} | ||
|
||
impl NodeBroadcastHandler { | ||
|
@@ -39,6 +40,7 @@ impl NodeBroadcastHandler { | |
signer: ValidatorSigner, | ||
epoch_state: Arc<EpochState>, | ||
storage: Arc<dyn DAGStorage>, | ||
fetch_requester: Arc<dyn TFetchRequester>, | ||
) -> Self { | ||
let epoch = epoch_state.epoch; | ||
let votes_by_round_peer = read_votes_from_storage(&storage, epoch); | ||
|
@@ -49,6 +51,7 @@ impl NodeBroadcastHandler { | |
signer, | ||
epoch_state, | ||
storage, | ||
fetch_requester, | ||
} | ||
} | ||
|
||
|
@@ -67,21 +70,15 @@ impl NodeBroadcastHandler { | |
self.storage.delete_votes(to_delete) | ||
} | ||
|
||
fn validate(&self, node: &Node) -> anyhow::Result<()> { | ||
fn validate(&self, node: Node) -> anyhow::Result<Node> { | ||
let current_round = node.metadata().round(); | ||
|
||
// round 0 is a special case and does not require any parents | ||
if current_round == 0 { | ||
return Ok(()); | ||
} | ||
|
||
let prev_round = current_round - 1; | ||
|
||
let dag_reader = self.dag.read(); | ||
// check if the parent round is missing in the DAG | ||
let lowest_round = dag_reader.lowest_round(); | ||
|
||
ensure!( | ||
prev_round >= dag_reader.lowest_round(), | ||
NodeBroadcastHandleError::MissingParents | ||
current_round >= lowest_round, | ||
NodeBroadcastHandleError::StaleRound(current_round) | ||
); | ||
|
||
// check which parents are missing in the DAG | ||
|
@@ -91,19 +88,30 @@ impl NodeBroadcastHandler { | |
.filter(|parent| !dag_reader.exists(parent.metadata())) | ||
.cloned() | ||
.collect(); | ||
drop(dag_reader); // Drop the DAG store early as it is no longer required | ||
|
||
if !missing_parents.is_empty() { | ||
// For each missing parent, verify their signatures and voting power | ||
// For each missing parent, verify their signatures and voting power. | ||
// Otherwise, a malicious node can send bad nodes with fake parents | ||
// and cause this peer to issue unnecessary fetch requests. | ||
ensure!( | ||
missing_parents | ||
.iter() | ||
.all(|parent| { parent.verify(&self.epoch_state.verifier).is_ok() }), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be in node.verify()? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to only verify the missing parents. Ideally, we won't miss any parents in the normal case and don't have to verify at all. |
||
NodeBroadcastHandleError::InvalidParent | ||
); | ||
// TODO: notify dag fetcher to fetch missing node and drop this node | ||
bail!(NodeBroadcastHandleError::MissingParents); | ||
|
||
// Don't issue fetch requests for parents of the lowest round in the DAG | ||
// because they are already GC'ed | ||
if current_round > lowest_round { | ||
if let Err(err) = self.fetch_requester.request_for_node(node) { | ||
error!("request to fetch failed: {}", err); | ||
} | ||
bail!(NodeBroadcastHandleError::MissingParents); | ||
} | ||
} | ||
|
||
Ok(()) | ||
Ok(node) | ||
} | ||
} | ||
|
||
|
@@ -137,7 +145,7 @@ impl RpcHandler for NodeBroadcastHandler { | |
type Response = Vote; | ||
|
||
fn process(&mut self, node: Self::Request) -> anyhow::Result<Self::Response> { | ||
self.validate(&node)?; | ||
let node = self.validate(node)?; | ||
|
||
let votes_by_peer = self | ||
.votes_by_round_peer | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still need to filter out the lower round right? otherwise we'd fetch all those nodes from gc'ed rounds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a good point. I can send you a node from a very old round and force you to fetch if it is below the GC level. We should ignore nodes from rounds below the GC level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I needed the DAG store to support this minimum round to fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's becoming hard to write unit tests without a verifier trait. I will follow-up to introduce that trait and then we can have more unit tests for this case. Added TODO for now.