From fbad1a5ac79cbcfe44f8d23be7ff2f5d1b5a4d57 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Wed, 2 Aug 2023 10:56:05 -0700 Subject: [PATCH 1/2] [dag] bootstrap logic --- consensus/src/dag/anchor_election.rs | 2 +- consensus/src/dag/bootstrap.rs | 117 +++++++++++++++++++++++++++ consensus/src/dag/dag_handler.rs | 78 ++++-------------- consensus/src/dag/mod.rs | 1 + consensus/src/dag/rb_handler.rs | 2 +- 5 files changed, 137 insertions(+), 63 deletions(-) create mode 100644 consensus/src/dag/bootstrap.rs diff --git a/consensus/src/dag/anchor_election.rs b/consensus/src/dag/anchor_election.rs index c04bbf6998048..ff7c45dc5a72b 100644 --- a/consensus/src/dag/anchor_election.rs +++ b/consensus/src/dag/anchor_election.rs @@ -3,7 +3,7 @@ use aptos_consensus_types::common::{Author, Round}; -pub trait AnchorElection { +pub trait AnchorElection: Send { fn get_anchor(&self, round: Round) -> Author; fn commit(&mut self, round: Round); diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs new file mode 100644 index 0000000000000..6f96d036c51f5 --- /dev/null +++ b/consensus/src/dag/bootstrap.rs @@ -0,0 +1,117 @@ +// Copyright © Aptos Foundation + +use super::{ + anchor_election::RoundRobinAnchorElection, + dag_driver::DagDriver, + dag_fetcher::{DagFetcher, FetchRequestHandler}, + dag_handler::NetworkHandler, + dag_network::DAGNetworkSender, + dag_store::Dag, + order_rule::OrderRule, + rb_handler::NodeBroadcastHandler, + storage::DAGStorage, + types::DAGMessage, + CertifiedNode, +}; +use crate::{network::IncomingDAGRequest, state_replication::PayloadClient}; +use aptos_channels::{aptos_channel, message_queues::QueueStyle}; +use aptos_consensus_types::common::Author; +use aptos_infallible::RwLock; +use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; +use aptos_types::{ + epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner, +}; +use futures::stream::{AbortHandle, Abortable}; +use std::sync::Arc; +use tokio_retry::strategy::ExponentialBackoff; + +pub fn bootstrap_dag( + self_peer: Author, + signer: ValidatorSigner, + epoch_state: Arc, + latest_ledger_info: LedgerInfo, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: aptos_time_service::TimeService, + payload_client: Arc, +) -> ( + AbortHandle, + AbortHandle, + aptos_channel::Sender, + futures_channel::mpsc::UnboundedReceiver>>, +) { + let validators = epoch_state.verifier.get_ordered_account_addresses(); + let current_round = latest_ledger_info.round(); + + let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); + let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); + + // A backoff policy that starts at 100ms and doubles each iteration. + let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); + let rb = Arc::new(ReliableBroadcast::new( + validators.clone(), + rb_network_sender, + rb_backoff_policy, + time_service.clone(), + )); + + let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone()))); + + let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); + let order_rule = OrderRule::new( + epoch_state.clone(), + latest_ledger_info, + dag.clone(), + anchor_election, + ordered_nodes_tx, + ); + + let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = + DagFetcher::new( + epoch_state.clone(), + dag_network_sender, + dag.clone(), + time_service.clone(), + ); + let fetch_requester = Arc::new(fetch_requester); + + let dag_driver = DagDriver::new( + self_peer, + epoch_state.clone(), + dag.clone(), + payload_client, + rb, + current_round, + time_service, + storage.clone(), + order_rule, + fetch_requester, + ); + let rb_handler = + NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone()); + let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone()); + + let dag_handler = NetworkHandler::new( + epoch_state, + dag_rpc_rx, + rb_handler, + dag_driver, + fetch_handler, + node_fetch_waiter, + certified_node_fetch_waiter, + ); + + let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair(); + let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair(); + + tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration)); + tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration)); + + ( + nh_abort_handle, + df_abort_handle, + dag_rpc_tx, + ordered_nodes_rx, + ) +} diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 9e67ab0f63640..e1da8eb1a1b6e 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -2,101 +2,57 @@ use super::{ dag_driver::DagDriver, - dag_fetcher::{DagFetcher, FetchRequestHandler, FetchWaiter}, - dag_network::DAGNetworkSender, - order_rule::OrderRule, - storage::DAGStorage, + dag_fetcher::{FetchRequestHandler, FetchWaiter}, types::TDAGMessage, CertifiedNode, Node, }; use crate::{ - dag::{ - dag_network::RpcHandler, dag_store::Dag, rb_handler::NodeBroadcastHandler, - types::DAGMessage, - }, + dag::{dag_network::RpcHandler, rb_handler::NodeBroadcastHandler, types::DAGMessage}, network::{IncomingDAGRequest, TConsensusMsg}, - state_replication::PayloadClient, }; use anyhow::bail; use aptos_channels::aptos_channel; use aptos_consensus_types::common::Author; -use aptos_infallible::RwLock; use aptos_logger::{error, warn}; use aptos_network::protocols::network::RpcError; -use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; -use aptos_time_service::TimeService; -use aptos_types::{epoch_state::EpochState, validator_signer::ValidatorSigner}; +use aptos_types::epoch_state::EpochState; use bytes::Bytes; use futures::StreamExt; use std::sync::Arc; use tokio::select; -use tokio_retry::strategy::ExponentialBackoff; -struct NetworkHandler { +pub(crate) struct NetworkHandler { + epoch_state: Arc, dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, - epoch_state: Arc, node_fetch_waiter: FetchWaiter, certified_node_fetch_waiter: FetchWaiter, } impl NetworkHandler { - fn new( - dag: Arc>, - dag_rpc_rx: aptos_channel::Receiver, - signer: ValidatorSigner, + pub fn new( epoch_state: Arc, - storage: Arc, - payload_client: Arc, - dag_network_sender: Arc, - rb_network_sender: Arc>, - time_service: TimeService, - order_rule: OrderRule, + dag_rpc_rx: aptos_channel::Receiver, + node_receiver: NodeBroadcastHandler, + dag_driver: DagDriver, + fetch_receiver: FetchRequestHandler, + node_fetch_waiter: FetchWaiter, + certified_node_fetch_waiter: FetchWaiter, ) -> Self { - let rb = Arc::new(ReliableBroadcast::new( - epoch_state.verifier.get_ordered_account_addresses().clone(), - rb_network_sender, - ExponentialBackoff::from_millis(10), - time_service.clone(), - )); - let (_dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = - DagFetcher::new( - epoch_state.clone(), - dag_network_sender, - dag.clone(), - time_service.clone(), - ); - let fetch_requester = Arc::new(fetch_requester); Self { + epoch_state, dag_rpc_rx, - node_receiver: NodeBroadcastHandler::new( - dag.clone(), - signer.clone(), - epoch_state.clone(), - storage.clone(), - ), - dag_driver: DagDriver::new( - signer.author(), - epoch_state.clone(), - dag.clone(), - payload_client, - rb, - 1, - time_service, - storage, - order_rule, - fetch_requester, - ), - epoch_state: epoch_state.clone(), - fetch_receiver: FetchRequestHandler::new(dag, epoch_state), + node_receiver, + dag_driver, + fetch_receiver, node_fetch_waiter, certified_node_fetch_waiter, } } - async fn start(mut self) { + pub async fn start(mut self) { self.dag_driver.try_enter_new_round(); // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index b05f59d86d778..7b96f4384e315 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -3,6 +3,7 @@ #![allow(dead_code)] mod anchor_election; +mod bootstrap; mod dag_driver; mod dag_fetcher; mod dag_handler; diff --git a/consensus/src/dag/rb_handler.rs b/consensus/src/dag/rb_handler.rs index 363218978777b..09bddf19623be 100644 --- a/consensus/src/dag/rb_handler.rs +++ b/consensus/src/dag/rb_handler.rs @@ -25,7 +25,7 @@ pub enum NodeBroadcastHandleError { NotEnoughParents, } -pub struct NodeBroadcastHandler { +pub(crate) struct NodeBroadcastHandler { dag: Arc>, votes_by_round_peer: BTreeMap>, signer: ValidatorSigner, From 9913b41e1b63c0fc6fbe88374ad755df80ad7b71 Mon Sep 17 00:00:00 2001 From: Gerardo Di Giacomo Date: Mon, 14 Aug 2023 12:53:24 -0700 Subject: [PATCH 2/2] Update workflows (#9650) * Update semgrep.yaml to also run daily * update semgrep rule * fix workflows * Update .github/workflows/semgrep.yaml Co-authored-by: Balaji Arun --------- Co-authored-by: Balaji Arun --- .../semgrep/pull-request-target-code-checkout.yaml | 9 +++++++++ .github/workflows/docker-build-test.yaml | 1 + .github/workflows/semgrep.yaml | 2 ++ .github/workflows/ts-sdk-e2e-tests.yaml | 1 + 4 files changed, 13 insertions(+) diff --git a/.github/linters/semgrep/pull-request-target-code-checkout.yaml b/.github/linters/semgrep/pull-request-target-code-checkout.yaml index 1348d505f6c36..a6186a753ab37 100644 --- a/.github/linters/semgrep/pull-request-target-code-checkout.yaml +++ b/.github/linters/semgrep/pull-request-target-code-checkout.yaml @@ -47,6 +47,15 @@ rules: ... $JOBNAME: ... + - pattern-not-inside: | + needs: [..., permission-check, ...] + ... + - pattern-not-inside: | + needs: + ... + - permission-check + ... + ... - pattern-not-inside: | needs: [permission-check] ... diff --git a/.github/workflows/docker-build-test.yaml b/.github/workflows/docker-build-test.yaml index 6a7f0f15c4027..0d373bc32a036 100644 --- a/.github/workflows/docker-build-test.yaml +++ b/.github/workflows/docker-build-test.yaml @@ -111,6 +111,7 @@ jobs: # This job determines which files were changed file_change_determinator: + needs: [permission-check] runs-on: ubuntu-latest outputs: only_docs_changed: ${{ steps.determine_file_changes.outputs.only_docs_changed }} diff --git a/.github/workflows/semgrep.yaml b/.github/workflows/semgrep.yaml index 320f35904f60e..9505c7b3b2b9c 100644 --- a/.github/workflows/semgrep.yaml +++ b/.github/workflows/semgrep.yaml @@ -4,6 +4,8 @@ on: workflow_dispatch: pull_request: types: [labeled, opened, synchronize, reopened, auto_merge_enabled] + schedule: + - cron: '0 * * * *' jobs: semgrep: diff --git a/.github/workflows/ts-sdk-e2e-tests.yaml b/.github/workflows/ts-sdk-e2e-tests.yaml index 8a4b53419af29..1a9c168e5dbbf 100644 --- a/.github/workflows/ts-sdk-e2e-tests.yaml +++ b/.github/workflows/ts-sdk-e2e-tests.yaml @@ -34,6 +34,7 @@ jobs: # This job determines which files were changed file_change_determinator: + needs: [permission-check] runs-on: ubuntu-latest outputs: only_docs_changed: ${{ steps.determine_file_changes.outputs.only_docs_changed }}