Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dag] bootstrap logic #9455

Merged
merged 3 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/src/dag/anchor_election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use aptos_consensus_types::common::{Author, Round};

pub trait AnchorElection {
pub trait AnchorElection: Send {
fn get_anchor(&self, round: Round) -> Author;

fn commit(&mut self, round: Round);
Expand Down
117 changes: 117 additions & 0 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright © Aptos Foundation

use super::{
anchor_election::RoundRobinAnchorElection,
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler},
dag_handler::NetworkHandler,
dag_network::DAGNetworkSender,
dag_store::Dag,
order_rule::OrderRule,
rb_handler::NodeBroadcastHandler,
storage::DAGStorage,
types::DAGMessage,
CertifiedNode,
};
use crate::{network::IncomingDAGRequest, state_replication::PayloadClient};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{
epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
};
use futures::stream::{AbortHandle, Abortable};
use std::sync::Arc;
use tokio_retry::strategy::ExponentialBackoff;

pub fn bootstrap_dag(
self_peer: Author,
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
latest_ledger_info: LedgerInfo,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
dag_network_sender: Arc<dyn DAGNetworkSender>,
time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
) -> (
AbortHandle,
AbortHandle,
aptos_channel::Sender<Author, IncomingDAGRequest>,
futures_channel::mpsc::UnboundedReceiver<Vec<Arc<CertifiedNode>>>,
) {
let validators = epoch_state.verifier.get_ordered_account_addresses();
let current_round = latest_ledger_info.round();

let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);

// A backoff policy that starts at 100ms and doubles each iteration.
let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50);
let rb = Arc::new(ReliableBroadcast::new(
validators.clone(),
rb_network_sender,
rb_backoff_policy,
time_service.clone(),
));

let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone())));

let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));
let order_rule = OrderRule::new(
epoch_state.clone(),
latest_ledger_info,
dag.clone(),
anchor_election,
ordered_nodes_tx,
);

let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcher::new(
epoch_state.clone(),
dag_network_sender,
dag.clone(),
time_service.clone(),
);
let fetch_requester = Arc::new(fetch_requester);

let dag_driver = DagDriver::new(
self_peer,
epoch_state.clone(),
dag.clone(),
payload_client,
rb,
current_round,
time_service,
storage.clone(),
order_rule,
fetch_requester,
);
let rb_handler =
NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone());
let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone());

let dag_handler = NetworkHandler::new(
epoch_state,
dag_rpc_rx,
rb_handler,
dag_driver,
fetch_handler,
node_fetch_waiter,
certified_node_fetch_waiter,
);

let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair();
let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are those used for shutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that was my idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typically we don't need those since dropping the sender would result in the task stops if it's something like this

  while let Some(x) = rx.next().await {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am planning to introduce an interval timer for garbage collection. Let me keep it for now, and we can clean up later if it becomes unnecessary.


tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration));
tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration));

(
nh_abort_handle,
df_abort_handle,
dag_rpc_tx,
ordered_nodes_rx,
)
}
78 changes: 17 additions & 61 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,101 +2,57 @@

use super::{
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler, FetchWaiter},
dag_network::DAGNetworkSender,
order_rule::OrderRule,
storage::DAGStorage,
dag_fetcher::{FetchRequestHandler, FetchWaiter},
types::TDAGMessage,
CertifiedNode, Node,
};
use crate::{
dag::{
dag_network::RpcHandler, dag_store::Dag, rb_handler::NodeBroadcastHandler,
types::DAGMessage,
},
dag::{dag_network::RpcHandler, rb_handler::NodeBroadcastHandler, types::DAGMessage},
network::{IncomingDAGRequest, TConsensusMsg},
state_replication::PayloadClient,
};
use anyhow::bail;
use aptos_channels::aptos_channel;
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_logger::{error, warn};
use aptos_network::protocols::network::RpcError;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_time_service::TimeService;
use aptos_types::{epoch_state::EpochState, validator_signer::ValidatorSigner};
use aptos_types::epoch_state::EpochState;
use bytes::Bytes;
use futures::StreamExt;
use std::sync::Arc;
use tokio::select;
use tokio_retry::strategy::ExponentialBackoff;

struct NetworkHandler {
pub(crate) struct NetworkHandler {
epoch_state: Arc<EpochState>,
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
node_receiver: NodeBroadcastHandler,
dag_driver: DagDriver,
fetch_receiver: FetchRequestHandler,
epoch_state: Arc<EpochState>,
node_fetch_waiter: FetchWaiter<Node>,
certified_node_fetch_waiter: FetchWaiter<CertifiedNode>,
}

impl NetworkHandler {
fn new(
dag: Arc<RwLock<Dag>>,
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
signer: ValidatorSigner,
pub fn new(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
payload_client: Arc<dyn PayloadClient>,
dag_network_sender: Arc<dyn DAGNetworkSender>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
time_service: TimeService,
order_rule: OrderRule,
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
node_receiver: NodeBroadcastHandler,
dag_driver: DagDriver,
fetch_receiver: FetchRequestHandler,
node_fetch_waiter: FetchWaiter<Node>,
certified_node_fetch_waiter: FetchWaiter<CertifiedNode>,
) -> Self {
let rb = Arc::new(ReliableBroadcast::new(
epoch_state.verifier.get_ordered_account_addresses().clone(),
rb_network_sender,
ExponentialBackoff::from_millis(10),
time_service.clone(),
));
let (_dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcher::new(
epoch_state.clone(),
dag_network_sender,
dag.clone(),
time_service.clone(),
);
let fetch_requester = Arc::new(fetch_requester);
Self {
epoch_state,
dag_rpc_rx,
node_receiver: NodeBroadcastHandler::new(
dag.clone(),
signer.clone(),
epoch_state.clone(),
storage.clone(),
),
dag_driver: DagDriver::new(
signer.author(),
epoch_state.clone(),
dag.clone(),
payload_client,
rb,
1,
time_service,
storage,
order_rule,
fetch_requester,
),
epoch_state: epoch_state.clone(),
fetch_receiver: FetchRequestHandler::new(dag, epoch_state),
node_receiver,
dag_driver,
fetch_receiver,
node_fetch_waiter,
certified_node_fetch_waiter,
}
}

async fn start(mut self) {
pub async fn start(mut self) {
self.dag_driver.try_enter_new_round();

// TODO(ibalajiarun): clean up Reliable Broadcast storage periodically.
Expand Down
1 change: 1 addition & 0 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![allow(dead_code)]

mod anchor_election;
mod bootstrap;
mod dag_driver;
mod dag_fetcher;
mod dag_handler;
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum NodeBroadcastHandleError {
NotEnoughParents,
}

pub struct NodeBroadcastHandler {
pub(crate) struct NodeBroadcastHandler {
dag: Arc<RwLock<Dag>>,
votes_by_round_peer: BTreeMap<Round, BTreeMap<Author, Vote>>,
signer: ValidatorSigner,
Expand Down
Loading