Skip to content

Commit

Permalink
[dag] Integrate Order Rule with Dag Driver
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 11, 2023
1 parent 33d65fc commit 5058b7b
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 10 deletions.
12 changes: 10 additions & 2 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
order_rule::OrderRule,
storage::DAGStorage,
types::{CertifiedAck, DAGMessage},
RpcHandler,
Expand Down Expand Up @@ -43,6 +44,7 @@ pub(crate) struct DagDriver {
time_service: TimeService,
rb_abort_handle: Option<AbortHandle>,
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
}

impl DagDriver {
Expand All @@ -55,6 +57,7 @@ impl DagDriver {
current_round: Round,
time_service: TimeService,
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
) -> Self {
// TODO: rebroadcast nodes after recovery
Self {
Expand All @@ -67,6 +70,7 @@ impl DagDriver {
time_service,
rb_abort_handle: None,
storage,
order_rule,
}
}

Expand Down Expand Up @@ -149,11 +153,15 @@ impl RpcHandler for DagDriver {
{
let dag_reader = self.dag.read();
if dag_reader.exists(node.metadata()) {
return Ok(CertifiedAck::new(node.metadata().epoch()));
return Ok(CertifiedAck::new(epoch));
}
}

self.add_node(node)?;
let node_metadata = node.metadata().clone();
self.add_node(node).and_then(|_| {
self.order_rule.process_new_node(&node_metadata);
Ok(())
})?;

Ok(CertifiedAck::new(epoch))
}
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright © Aptos Foundation

use super::{
dag_driver::DagDriver, dag_fetcher::FetchRequestHandler, dag_network::DAGNetworkSender,
storage::DAGStorage, types::TDAGMessage,
order_rule::OrderRule, dag_driver::DagDriver, dag_fetcher::FetchRequestHandler, storage::DAGStorage, dag_network::DAGNetworkSender, types::TDAGMessage,
};
use crate::{
dag::{
Expand Down Expand Up @@ -45,6 +44,7 @@ impl NetworkHandler {
_dag_network_sender: Arc<dyn DAGNetworkSender>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
time_service: TimeService,
order_rule: OrderRule,
) -> Self {
let rb = Arc::new(ReliableBroadcast::new(
epoch_state.verifier.get_ordered_account_addresses().clone(),
Expand All @@ -69,6 +69,7 @@ impl NetworkHandler {
1,
time_service,
storage,
order_rule,
),
epoch_state: epoch_state.clone(),
fetch_receiver: FetchRequestHandler::new(dag, epoch_state),
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/order_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ impl OrderRule {
(r1 ^ r2) & 1 == 0
}

pub fn process_new_node(&mut self, node: &CertifiedNode) {
let round = node.round();
pub fn process_new_node(&mut self, node_metadata: &NodeMetadata) {
let round = node_metadata.round();
// If the node comes from the proposal round in the current instance, it can't trigger any ordering
if round <= self.lowest_unordered_anchor_round
|| Self::check_parity(round, self.lowest_unordered_anchor_round)
Expand Down
17 changes: 15 additions & 2 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ use crate::{
dag_driver::{DagDriver, DagDriverError},
dag_network::{DAGNetworkSender, RpcWithFallback},
dag_store::Dag,
order_rule::OrderRule,
tests::{dag_test::MockStorage, helpers::new_certified_node},
types::{CertifiedAck, DAGMessage},
RpcHandler,
RpcHandler, anchor_election::RoundRobinAnchorElection,
},
test_utils::MockPayloadManager,
};
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_time_service::TimeService;
use aptos_types::{epoch_state::EpochState, validator_verifier::random_validator_verifier};
use aptos_types::{
epoch_state::EpochState, ledger_info::LedgerInfo, validator_verifier::random_validator_verifier,
};
use async_trait::async_trait;
use claims::{assert_ok, assert_ok_eq};
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -78,6 +81,15 @@ fn test_certified_node_handler() {
aptos_time_service::TimeService::mock(),
));
let time_service = TimeService::mock();
let (ordered_nodes_sender, _) = futures_channel::mpsc::unbounded();
let validators = signers.iter().map(|vs| vs.author()).collect();
let order_rule = OrderRule::new(
epoch_state.clone(),
LedgerInfo::mock_genesis(None),
dag.clone(),
Box::new(RoundRobinAnchorElection::new(validators)),
ordered_nodes_sender,
);
let mut driver = DagDriver::new(
signers[0].author(),
epoch_state,
Expand All @@ -87,6 +99,7 @@ fn test_certified_node_handler() {
1,
time_service,
storage,
order_rule,
);

// expect an ack for a valid message
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/tests/order_rule_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ proptest! {
let dag = Arc::new(RwLock::new(dag.clone()));
let (mut order_rule, mut receiver) = create_order_rule(epoch_state.clone(), dag);
for idx in seq {
order_rule.process_new_node(&flatten_nodes[idx]);
order_rule.process_new_node(flatten_nodes[idx].metadata());
}
let mut ordered = vec![];
while let Ok(Some(mut ordered_nodes)) = receiver.try_next() {
Expand Down Expand Up @@ -241,7 +241,7 @@ fn test_order_rule_basic() {
let dag = Arc::new(RwLock::new(dag.clone()));
let (mut order_rule, mut receiver) = create_order_rule(epoch_state, dag);
for node in nodes.iter().flatten().flatten() {
order_rule.process_new_node(node);
order_rule.process_new_node(node.metadata());
}
let expected_order = vec![
// anchor (1, 0) has 1 votes, anchor (3, 1) has 2 votes and a path to (1, 0)
Expand Down

0 comments on commit 5058b7b

Please sign in to comment.