diff --git a/consensus/src/dag/adapter.rs b/consensus/src/dag/adapter.rs index 5d4f58415eeea1..3ea876970490a9 100644 --- a/consensus/src/dag/adapter.rs +++ b/consensus/src/dag/adapter.rs @@ -10,7 +10,6 @@ use aptos_consensus_types::{ common::{Author, Payload, Round}, executed_block::ExecutedBlock, }; -use aptos_crypto::HashValue; use aptos_executor_types::StateComputeResult; use aptos_logger::error; use aptos_types::{ @@ -65,7 +64,7 @@ impl Notifier for BufferManagerAdapter { Ok(self.executor_channel.unbounded_send(OrderedBlocks { ordered_blocks: vec![block], ordered_proof: LedgerInfoWithSignatures::new( - LedgerInfo::new(block_info, HashValue::zero()), + LedgerInfo::new(block_info, anchor.digest()), AggregateSignature::empty(), ), callback: Box::new( diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 95dc77e98194b0..8954ed5e452f80 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -3,7 +3,7 @@ use super::{ anchor_election::RoundRobinAnchorElection, dag_driver::DagDriver, - dag_fetcher::{DagFetcher, FetchRequestHandler}, + dag_fetcher::{DagFetcherService, FetchRequestHandler}, dag_handler::NetworkHandler, dag_network::TDAGNetworkSender, dag_store::Dag, @@ -76,7 +76,7 @@ pub fn bootstrap_dag( ); let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = - DagFetcher::new( + DagFetcherService::new( epoch_state.clone(), dag_network_sender, dag.clone(), diff --git a/consensus/src/dag/dag_fetcher.rs b/consensus/src/dag/dag_fetcher.rs index 0ddddf30f8a6d6..2cbf04a95448fa 100644 --- a/consensus/src/dag/dag_fetcher.rs +++ b/consensus/src/dag/dag_fetcher.rs @@ -1,18 +1,23 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{dag_network::RpcWithFallback, types::NodeMetadata, RpcHandler}; +use super::{ + dag_network::RpcWithFallback, + types::{NodeMetadata, RemoteFetchRequestTargets}, + RpcHandler, +}; use crate::dag::{ dag_network::TDAGNetworkSender, dag_store::Dag, types::{CertifiedNode, FetchResponse, Node, RemoteFetchRequest}, }; -use anyhow::ensure; +use anyhow::{anyhow, ensure}; use aptos_consensus_types::common::Author; -use aptos_infallible::RwLock; +use aptos_infallible::{RwLock, RwLockReadGuard}; use aptos_logger::error; use aptos_time_service::TimeService; use aptos_types::epoch_state::EpochState; +use async_trait::async_trait; use futures::{stream::FuturesUnordered, Stream, StreamExt}; use std::{ collections::HashMap, @@ -124,15 +129,13 @@ impl LocalFetchRequest { } } -pub struct DagFetcher { - epoch_state: Arc, - network: Arc, - dag: Arc>, +pub struct DagFetcherService { + inner: DagFetcher, + ordered_authors: Vec, request_rx: Receiver, - time_service: TimeService, } -impl DagFetcher { +impl DagFetcherService { pub fn new( epoch_state: Arc, network: Arc, @@ -147,13 +150,12 @@ impl DagFetcher { let (request_tx, request_rx) = tokio::sync::mpsc::channel(16); let (node_tx, node_rx) = tokio::sync::mpsc::channel(100); let (certified_node_tx, certified_node_rx) = tokio::sync::mpsc::channel(100); + let ordered_authors = epoch_state.verifier.get_ordered_account_addresses(); ( Self { - epoch_state, - network, - dag, + inner: DagFetcher::new(epoch_state, network, dag, time_service), request_rx, - time_service, + ordered_authors, }, FetchRequester { request_tx, @@ -167,75 +169,130 @@ impl DagFetcher { pub async fn start(mut self) { while let Some(local_request) = self.request_rx.recv().await { - let responders = local_request - .responders(&self.epoch_state.verifier.get_ordered_account_addresses()); - let remote_request = { - let dag_reader = self.dag.read(); - - let missing_parents: Vec = dag_reader - .filter_missing(local_request.node().parents_metadata()) - .cloned() - .collect(); - - if missing_parents.is_empty() { - local_request.notify(); - continue; - } - - let target = local_request.node(); - RemoteFetchRequest::new( - target.metadata().epoch(), - missing_parents, - dag_reader.bitmask(local_request.node().round()), + match self + .fetch_for_node( + local_request.node(), + local_request.responders(&self.ordered_authors), ) - }; - - let mut rpc = RpcWithFallback::new( - responders, - remote_request.clone().into(), - Duration::from_millis(500), - Duration::from_secs(1), - self.network.clone(), - self.time_service.clone(), - ); - while let Some(response) = rpc.next().await { - if let Ok(response) = - response - .and_then(FetchResponse::try_from) - .and_then(|response| { - response.verify(&remote_request, &self.epoch_state.verifier) - }) + .await + { + Ok(_) => local_request.notify(), + Err(err) => error!("unable to complete fetch successfully: {}", err), + } + } + } + + pub(super) async fn fetch_for_node( + &mut self, + node: &Node, + responders: Vec, + ) -> anyhow::Result<()> { + let remote_request = { + let dag_reader = self.inner.dag.read(); + + let missing_parents: Vec = dag_reader + .filter_missing(node.parents_metadata()) + .cloned() + .collect(); + + if missing_parents.is_empty() { + return Ok(()); + } + + RemoteFetchRequest::new( + node.metadata().epoch(), + RemoteFetchRequestTargets::ByMetadata(missing_parents), + dag_reader.bitmask(node.round()), + ) + }; + self.inner + .fetch(remote_request, responders, |dag_reader| { + dag_reader.all_exists(node.parents_metadata()) + }) + .await + } +} + +#[async_trait] +pub trait TDagFetcher { + async fn fetch( + &self, + remote_request: RemoteFetchRequest, + responders: Vec, + predicate: impl Fn(RwLockReadGuard) -> bool + Send, + ) -> anyhow::Result<()>; +} + +pub(crate) struct DagFetcher { + network: Arc, + time_service: TimeService, + epoch_state: Arc, + dag: Arc>, +} + +impl DagFetcher { + pub(crate) fn new( + epoch_state: Arc, + network: Arc, + dag: Arc>, + time_service: TimeService, + ) -> Self { + Self { + network, + time_service, + epoch_state, + dag, + } + } +} + +#[async_trait] +impl TDagFetcher for DagFetcher { + async fn fetch( + &self, + remote_request: RemoteFetchRequest, + responders: Vec, + predicate: impl Fn(RwLockReadGuard) -> bool + Send, + ) -> anyhow::Result<()> { + let mut rpc = RpcWithFallback::new( + responders, + remote_request.clone().into(), + Duration::from_millis(500), + Duration::from_secs(1), + self.network.clone(), + self.time_service.clone(), + ); + + // TODO retry + while let Some(response) = rpc.next().await { + if let Ok(response) = response + .and_then(FetchResponse::try_from) + .and_then(|response| response.verify(&remote_request, &self.epoch_state.verifier)) + { + let certified_nodes = response.certified_nodes(); + // TODO: support chunk response or fallback to state sync { - let certified_nodes = response.certified_nodes(); - // TODO: support chunk response or fallback to state sync - { - let mut dag_writer = self.dag.write(); - for node in certified_nodes { - if let Err(e) = dag_writer.add_node(node) { - error!("Failed to add node {}", e); - } + let mut dag_writer = self.dag.write(); + for node in certified_nodes { + if let Err(e) = dag_writer.add_node(node) { + error!("Failed to add node {}", e); } } + } - if self - .dag - .read() - .all_exists(local_request.node().parents_metadata()) - { - local_request.notify(); - break; - } + if predicate(self.dag.read()) { + return Ok(()); } } - // TODO retry } + Err(anyhow!("fetch failed")) } } #[derive(Debug, ThisError)] pub enum FetchRequestHandleError { - #[error("parents are missing")] - ParentsMissing, + #[error("target nodes are missing")] + TargetsMissing, } pub struct FetchRequestHandler { @@ -259,19 +316,31 @@ impl RpcHandler for FetchRequestHandler { fn process(&mut self, message: Self::Request) -> anyhow::Result { let dag_reader = self.dag.read(); - // `Certified Node`: In the good case, there should exist at least one honest validator that - // signed the Certified Node that has the all the parents to fulfil this - // request. - // `Node`: In the good case, the sender of the Node should have the parents in its local DAG - // to satisfy this request. - ensure!( - dag_reader.all_exists(message.targets().iter()), - FetchRequestHandleError::ParentsMissing - ); + let targets = match message.targets() { + RemoteFetchRequestTargets::ByMetadata(target_metadata) => { + // `Certified Node`: In the good case, there should exist at least one honest validator that + // signed the Certified Node that has the all the parents to fulfil this + // request. + // `Node`: In the good case, the sender of the Node should have the parents in its local DAG + // to satisfy this request. + ensure!( + dag_reader.all_exists(target_metadata.iter()), + FetchRequestHandleError::TargetsMissing + ); + + target_metadata.clone() + }, + RemoteFetchRequestTargets::ByRoundDigest(round, digest) => { + vec![dag_reader + .get_node_by_round_digest(*round, *digest) + .map(|node| node.metadata().clone()) + .ok_or_else(|| anyhow::anyhow!("unable to find node"))?] + }, + }; let certified_nodes: Vec<_> = dag_reader .reachable( - message.targets(), + &targets, Some(message.exists_bitmask().first_round()), |_| true, ) diff --git a/consensus/src/dag/dag_network.rs b/consensus/src/dag/dag_network.rs index b56511d961d730..be01b934de0139 100644 --- a/consensus/src/dag/dag_network.rs +++ b/consensus/src/dag/dag_network.rs @@ -4,6 +4,7 @@ use super::types::DAGMessage; use aptos_consensus_types::common::Author; use aptos_reliable_broadcast::RBNetworkSender; use aptos_time_service::{Interval, TimeService, TimeServiceTrait}; +use aptos_types::{epoch_change::EpochChangeProof, ledger_info::LedgerInfoWithSignatures}; use async_trait::async_trait; use futures::{ stream::{FusedStream, FuturesUnordered}, @@ -42,6 +43,10 @@ pub trait TDAGNetworkSender: Send + Sync + RBNetworkSender { retry_interval: Duration, rpc_timeout: Duration, ) -> RpcWithFallback; + + async fn send_epoch_change(&self, proof: EpochChangeProof); + + async fn send_commit_proof(&self, ledger_info: LedgerInfoWithSignatures); } struct Responders { diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs new file mode 100644 index 00000000000000..94d5db0ca6dde7 --- /dev/null +++ b/consensus/src/dag/dag_state_sync.rs @@ -0,0 +1,156 @@ +use super::{ + dag_fetcher::{DagFetcher, TDagFetcher}, + dag_store::Dag, + storage::DAGStorage, + types::{CertifiedNodeWithLedgerInfo, RemoteFetchRequest, RemoteFetchRequestTargets}, + TDAGNetworkSender, +}; +use crate::state_replication::StateComputer; +use aptos_infallible::RwLock; +use aptos_logger::error; +use aptos_time_service::TimeService; +use aptos_types::{ + epoch_change::EpochChangeProof, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, +}; +use std::sync::Arc; + +pub const DAG_WINDOW: u64 = 10; + +struct StateSyncManager { + epoch_state: Arc, + network: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, + dag_store: Arc>, +} + +impl StateSyncManager { + async fn sync( + &self, + node: &CertifiedNodeWithLedgerInfo, + ) -> anyhow::Result>>> { + self.sync_to_highest_commit_cert(node.ledger_info()).await; + self.sync_to_highest_ordered_anchor(node).await + } + + /// Fast forward in the decoupled-execution pipeline if the block exists there + async fn sync_to_highest_commit_cert(&self, ledger_info: &LedgerInfoWithSignatures) { + let dag_reader = self.dag_store.read(); + + // if the anchor exists between ledger info round and highest ordered round + // Note: ledger info round <= highest ordered round + if dag_reader.highest_committed_round().unwrap_or_default() + < ledger_info.commit_info().round() + && dag_reader.exists_by_round_digest( + ledger_info.commit_info().round(), + ledger_info.ledger_info().consensus_data_hash(), + ) + && dag_reader.highest_ordered_round().unwrap_or_default() + >= ledger_info.commit_info().round() + { + self.network.send_commit_proof(ledger_info.clone()).await + } + } + + /// Check if we're far away from this ledger info and need to sync. + /// This ensures that the block referred by the ledger info is not in buffer manager. + pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { + let dag_reader = self.dag_store.read(); + (dag_reader.highest_ordered_round().unwrap_or_default() < li.commit_info().round() + && !dag_reader.exists_by_round_digest( + li.commit_info().round(), + li.ledger_info().consensus_data_hash(), + )) + || dag_reader.highest_committed_round().unwrap_or_default() + 2 * DAG_WINDOW + < li.commit_info().round() + } + + async fn sync_to_highest_ordered_anchor( + &self, + node: &CertifiedNodeWithLedgerInfo, + ) -> anyhow::Result>>> { + // Check whether to actually sync + let commit_li = node.ledger_info(); + if !self.need_sync_for_ledger_info(commit_li) { + return Ok(None); + } + + // TODO: there is a case where DAG fetches missing nodes in window and a crash happens and when we restart, + // we end up with a gap between the DAG and we need to be smart enough to clean up the DAG before the gap. + + // Create a new DAG store and Fetch blocks + let target_round = node.round(); + let start_round = commit_li.commit_info().round().saturating_sub(DAG_WINDOW); + let sync_dag_store = Arc::new(RwLock::new(Dag::new( + self.epoch_state.clone(), + self.storage.clone(), + start_round, + ))); + let targets = RemoteFetchRequestTargets::ByRoundDigest( + target_round, + commit_li.ledger_info().consensus_data_hash(), + ); + let bitmask = { sync_dag_store.read().bitmask(target_round) }; + let request = RemoteFetchRequest::new(self.epoch_state.epoch, targets, bitmask); + + let responders = node + .certificate() + .signatures() + .get_signers_addresses(&self.epoch_state.verifier.get_ordered_account_addresses()); + + let dag_fetcher = DagFetcher::new( + self.epoch_state.clone(), + self.network.clone(), + sync_dag_store.clone(), + self.time_service.clone(), + ); + + match dag_fetcher + .fetch(request, responders, |dag_reader| { + dag_reader + .get_node_by_round_digest( + target_round, + commit_li.ledger_info().consensus_data_hash(), + ) + .is_some() + }) + .await + { + Ok(_) => {}, + Err(err) => { + error!("error fetching nodes {}", err); + return Err(err); + }, + } + + // State sync + self.state_computer.sync_to(commit_li.clone()).await?; + + { + let mut dag_writer = sync_dag_store.write(); + dag_writer.prune(); + if let Some(node_status) = dag_writer.get_node_ref_mut_by_round_digest( + commit_li.ledger_info().round(), + commit_li.ledger_info().consensus_data_hash(), + ) { + node_status.mark_as_committed(); + } + if let Some(node_status) = + dag_writer.get_node_ref_mut(node.round(), node.author()) + { + node_status.mark_as_ordered(); + } + } + + if commit_li.ledger_info().ends_epoch() { + self.network + .send_epoch_change(EpochChangeProof::new( + vec![commit_li.clone()], + /* more = */ false, + )) + .await; + } + Ok(Some(sync_dag_store)) + } +} diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs index 67fb60ba952285..bf5af78a191cbf 100644 --- a/consensus/src/dag/dag_store.rs +++ b/consensus/src/dag/dag_store.rs @@ -36,6 +36,12 @@ impl NodeStatus { assert!(matches!(self, NodeStatus::Unordered(_))); *self = NodeStatus::Ordered(self.as_node().clone()); } + + pub fn mark_as_committed(&mut self) { + assert!(!matches!(self, NodeStatus::Committed(_))); + // TODO: try to avoid clone + *self = NodeStatus::Committed(self.as_node().clone()); + } } /// Data structure that stores the DAG representation, it maintains round based index. @@ -46,6 +52,7 @@ pub struct Dag { author_to_index: HashMap, storage: Arc, initial_round: Round, + epoch_state: Arc, } impl Dag { @@ -83,6 +90,7 @@ impl Dag { author_to_index, storage, initial_round, + epoch_state, } } @@ -131,10 +139,21 @@ impl Dag { self.get_node_ref_by_metadata(metadata).is_some() } + pub fn exists_by_round_digest(&self, round: Round, digest: HashValue) -> bool { + self.get_node_by_round_digest(round, digest).is_some() + } + pub fn all_exists<'a>(&self, nodes: impl Iterator) -> bool { self.filter_missing(nodes).next().is_none() } + pub fn all_exists_by_round_author<'a>( + &self, + mut nodes: impl Iterator, + ) -> bool { + nodes.all(|(round, author)| self.get_node_ref(*round, author).is_some()) + } + pub fn filter_missing<'a, 'b>( &'b self, nodes: impl Iterator + 'b, @@ -142,22 +161,55 @@ impl Dag { nodes.filter(|node_metadata| !self.exists(node_metadata)) } + pub fn get_node_ref_by_round_digest( + &self, + round: Round, + digest: HashValue, + ) -> Option<&NodeStatus> { + self.get_round_iter(round)? + .find(|node_status| node_status.as_node().digest() == digest) + } + + pub fn get_node_ref_mut_by_round_digest( + &mut self, + round: Round, + digest: HashValue, + ) -> Option<&mut NodeStatus> { + self.get_round_iter_mut(round)? + .find(|node_status| node_status.as_node().digest() == digest) + } + fn get_node_ref_by_metadata(&self, metadata: &NodeMetadata) -> Option<&NodeStatus> { self.get_node_ref(metadata.round(), metadata.author()) } - fn get_node_ref(&self, round: Round, author: &Author) -> Option<&NodeStatus> { + pub fn get_node_ref(&self, round: Round, author: &Author) -> Option<&NodeStatus> { let index = self.author_to_index.get(author)?; let round_ref = self.nodes_by_round.get(&round)?; round_ref[*index].as_ref() } + pub fn get_node_ref_mut(&mut self, round: Round, author: &Author) -> Option<&mut NodeStatus> { + let index = self.author_to_index.get(author)?; + let round_ref = self.nodes_by_round.get_mut(&round)?; + round_ref[*index].as_mut() + } + fn get_round_iter(&self, round: Round) -> Option> { self.nodes_by_round .get(&round) .map(|round_ref| round_ref.iter().flatten()) } + fn get_round_iter_mut( + &mut self, + round: Round, + ) -> Option> { + self.nodes_by_round + .get_mut(&round) + .map(|round_ref| round_ref.iter_mut().flatten()) + } + pub fn get_node(&self, metadata: &NodeMetadata) -> Option> { self.get_node_ref_by_metadata(metadata) .map(|node_status| node_status.as_node().clone()) @@ -172,6 +224,15 @@ impl Dag { .map(|node_status| node_status.as_node()) } + pub fn get_node_by_round_digest( + &self, + round: Round, + digest: HashValue, + ) -> Option<&Arc> { + self.get_node_ref_by_round_digest(round, digest) + .map(|node_status| node_status.as_node()) + } + // TODO: I think we can cache votes in the NodeStatus::Unordered pub fn check_votes_for_node( &self, @@ -296,4 +357,43 @@ impl Dag { DagSnapshotBitmask::new(lowest_round, bitmask) } + + pub(super) fn prune(&mut self) { + let all_nodes = self.storage.get_certified_nodes().unwrap_or_default(); + let mut expired = vec![]; + for (digest, certified_node) in all_nodes { + if certified_node.metadata().epoch() != self.epoch_state.epoch + || certified_node.metadata().round() < self.initial_round + { + expired.push(digest); + self.nodes_by_round + .remove(&certified_node.metadata().round()); + } + } + if let Err(e) = self.storage.delete_certified_nodes(expired) { + error!("Error deleting expired nodes: {:?}", e); + } + } + + pub(super) fn highest_ordered_round(&self) -> Option { + for (round, round_nodes) in self.nodes_by_round.iter().rev() { + for maybe_node_status in round_nodes { + if matches!(maybe_node_status, Some(NodeStatus::Ordered(_))) { + return Some(*round); + } + } + } + None + } + + pub(super) fn highest_committed_round(&self) -> Option { + for (round, round_nodes) in self.nodes_by_round.iter() { + for maybe_node_status in round_nodes { + if matches!(maybe_node_status, Some(NodeStatus::Committed(_))) { + return Some(*round); + } + } + } + None + } } diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index 5b2c488ca84a4b..91985936e74ff5 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -16,6 +16,7 @@ mod storage; #[cfg(test)] mod tests; mod types; +mod dag_state_sync; pub use dag_network::{RpcHandler, RpcWithFallback, TDAGNetworkSender}; pub use types::{CertifiedNode, DAGMessage, DAGNetworkMessage, Extensions, Node, NodeId, Vote}; diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs index 4b2333eaf5ffd3..cfb730aefa6246 100644 --- a/consensus/src/dag/order_rule.rs +++ b/consensus/src/dag/order_rule.rs @@ -76,7 +76,7 @@ impl OrderRule { .dag .write() .reachable_mut(&anchor, None) - .for_each(|node_statue| node_statue.mark_as_ordered()); + .for_each(|node_status| node_status.mark_as_ordered()); } } else { // re-process pending anchors diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 940dfc5c9fc20d..6a50674931bf9c 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -4,7 +4,7 @@ use crate::{ dag::{ anchor_election::RoundRobinAnchorElection, dag_driver::{DagDriver, DagDriverError}, - dag_fetcher::DagFetcher, + dag_fetcher::DagFetcherService, dag_network::{RpcWithFallback, TDAGNetworkSender}, dag_store::Dag, order_rule::OrderRule, @@ -21,7 +21,7 @@ use aptos_infallible::RwLock; use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; use aptos_time_service::TimeService; use aptos_types::{ - epoch_state::EpochState, ledger_info::LedgerInfo, validator_verifier::random_validator_verifier, + epoch_state::EpochState, ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, validator_verifier::random_validator_verifier, epoch_change::EpochChangeProof, }; use async_trait::async_trait; use claims::{assert_ok, assert_ok_eq}; @@ -65,6 +65,14 @@ impl TDAGNetworkSender for MockNetworkSender { ) -> RpcWithFallback { unimplemented!() } + + async fn send_epoch_change(&self, _proof: EpochChangeProof) { + unimplemented!() + } + + async fn send_commit_proof(&self, _li: LedgerInfoWithSignatures) { + unimplemented!() + } } #[test] @@ -100,7 +108,7 @@ fn test_certified_node_handler() { storage.clone(), ); - let (_, fetch_requester, _, _) = DagFetcher::new( + let (_, fetch_requester, _, _) = DagFetcherService::new( epoch_state.clone(), network_sender, dag.clone(), diff --git a/consensus/src/dag/tests/dag_network_test.rs b/consensus/src/dag/tests/dag_network_test.rs index 2bf07bd8c1db90..038c53360ba160 100644 --- a/consensus/src/dag/tests/dag_network_test.rs +++ b/consensus/src/dag/tests/dag_network_test.rs @@ -9,7 +9,7 @@ use aptos_consensus_types::common::Author; use aptos_infallible::Mutex; use aptos_reliable_broadcast::RBNetworkSender; use aptos_time_service::{TimeService, TimeServiceTrait}; -use aptos_types::validator_verifier::random_validator_verifier; +use aptos_types::{validator_verifier::random_validator_verifier, epoch_change::EpochChangeProof, ledger_info::LedgerInfoWithSignatures}; use async_trait::async_trait; use claims::{assert_err, assert_ok}; use futures::StreamExt; @@ -85,6 +85,14 @@ impl TDAGNetworkSender for MockDAGNetworkSender { self.time_service.clone(), ) } + + async fn send_epoch_change(&self, _proof: EpochChangeProof) { + unimplemented!() + } + + async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) { + unimplemented!() + } } #[tokio::test] diff --git a/consensus/src/dag/tests/dag_state_sync_tests.rs b/consensus/src/dag/tests/dag_state_sync_tests.rs new file mode 100644 index 00000000000000..e306c484739551 --- /dev/null +++ b/consensus/src/dag/tests/dag_state_sync_tests.rs @@ -0,0 +1,4 @@ +#[test] +fn test_dag_state_sync() { + +} \ No newline at end of file diff --git a/consensus/src/dag/tests/fetcher_test.rs b/consensus/src/dag/tests/fetcher_test.rs index 113c3b8d9e501b..002000819a9b81 100644 --- a/consensus/src/dag/tests/fetcher_test.rs +++ b/consensus/src/dag/tests/fetcher_test.rs @@ -46,7 +46,8 @@ fn test_dag_fetcher_receiver() { .parents() .iter() .map(|parent| parent.metadata().clone()) - .collect(), + .collect::>() + .into(), DagSnapshotBitmask::new(1, vec![vec![true, false]]), ); assert_ok_eq!( diff --git a/consensus/src/dag/tests/types_test.rs b/consensus/src/dag/tests/types_test.rs index 786a91da9d0ba6..20abef34c2c83f 100644 --- a/consensus/src/dag/tests/types_test.rs +++ b/consensus/src/dag/tests/types_test.rs @@ -99,7 +99,7 @@ fn test_remote_fetch_request() { let request = RemoteFetchRequest::new( 1, - parents.clone(), + parents.clone().into(), DagSnapshotBitmask::new(1, vec![vec![false; 5]]), ); assert_eq!( @@ -109,14 +109,14 @@ fn test_remote_fetch_request() { let request = RemoteFetchRequest::new( 1, - vec![parents[0].clone()], + vec![parents[0].clone()].into(), DagSnapshotBitmask::new(1, vec![vec![false; signers.len()]]), ); assert_ok!(request.verify(&validator_verifier)); let request = RemoteFetchRequest::new( 1, - parents, + parents.into(), DagSnapshotBitmask::new(1, vec![vec![false; signers.len()]]), ); assert_ok!(request.verify(&validator_verifier)); diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index 405acff149701e..a0d6436ff80ebf 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -17,7 +17,7 @@ use aptos_types::{ aggregate_signature::{AggregateSignature, PartialSignatures}, epoch_state::EpochState, validator_signer::ValidatorSigner, - validator_verifier::ValidatorVerifier, + validator_verifier::ValidatorVerifier, ledger_info::LedgerInfoWithSignatures, }; use serde::{Deserialize, Serialize}; use std::{collections::HashSet, ops::Deref, sync::Arc}; @@ -418,6 +418,33 @@ impl TDAGMessage for CertifiedNode { } } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct CertifiedNodeWithLedgerInfo { + inner: CertifiedNode, + ledger_info: LedgerInfoWithSignatures, +} + +impl CertifiedNodeWithLedgerInfo { + fn new(node: CertifiedNode, ledger_info: LedgerInfoWithSignatures) -> Self { + Self { + inner: node, + ledger_info + } + } + + pub fn ledger_info(&self) -> &LedgerInfoWithSignatures { + &self.ledger_info + } +} + +impl Deref for CertifiedNodeWithLedgerInfo { + type Target = CertifiedNode; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct Vote { metadata: NodeMetadata, @@ -527,21 +554,37 @@ where } } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum RemoteFetchRequestTargets { + ByMetadata(Vec), + ByRoundDigest(Round, HashValue), +} + +impl From> for RemoteFetchRequestTargets { + fn from(value: Vec) -> Self { + Self::ByMetadata(value) + } +} + /// Represents a request to fetch missing dependencies for `target`, `start_round` represents /// the first round we care about in the DAG, `exists_bitmask` is a two dimensional bitmask represents /// if a node exist at [start_round + index][validator_index]. #[derive(Serialize, Deserialize, Clone, Debug)] pub struct RemoteFetchRequest { epoch: u64, - targets: Vec, + targets: RemoteFetchRequestTargets, exists_bitmask: DagSnapshotBitmask, } impl RemoteFetchRequest { - pub fn new(epoch: u64, parents: Vec, exists_bitmask: DagSnapshotBitmask) -> Self { + pub fn new( + epoch: u64, + targets: RemoteFetchRequestTargets, + exists_bitmask: DagSnapshotBitmask, + ) -> Self { Self { epoch, - targets: parents, + targets, exists_bitmask, } } @@ -550,7 +593,7 @@ impl RemoteFetchRequest { self.epoch } - pub fn targets(&self) -> &[NodeMetadata] { + pub fn targets(&self) -> &RemoteFetchRequestTargets { &self.targets } diff --git a/consensus/src/network.rs b/consensus/src/network.rs index ba178dd43ffac4..0f7ce2bb4aec50 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -323,7 +323,7 @@ impl NetworkSender { self.send(msg, recipients).await } - pub async fn send_epoch_change(&mut self, proof: EpochChangeProof) { + pub async fn send_epoch_change(&self, proof: EpochChangeProof) { fail_point!("consensus::send::epoch_change", |_| ()); let msg = ConsensusMsg::EpochChangeProof(Box::new(proof)); self.send(msg, vec![self.author]).await @@ -459,6 +459,15 @@ impl TDAGNetworkSender for DAGNetworkSenderImpl { self.time_service.clone(), ) } + + async fn send_epoch_change(&self, proof: EpochChangeProof) { + self.sender.send_epoch_change(proof).await + } + + /// Sends the ledger info to self buffer manager + async fn send_commit_proof(&self, ledger_info: LedgerInfoWithSignatures) { + self.sender.send_commit_proof(ledger_info).await + } } #[async_trait]