diff --git a/consensus/src/liveness/proposal_generator.rs b/consensus/src/liveness/proposal_generator.rs index 7ce9b5836fc2c..0c08782fcc9de 100644 --- a/consensus/src/liveness/proposal_generator.rs +++ b/consensus/src/liveness/proposal_generator.rs @@ -271,7 +271,7 @@ pub struct ProposalGenerator { allow_batches_without_pos_in_proposal: bool, opt_qs_payload_param_provider: Arc, - last_proposed_proposal: Arc>>, + proposed_proposals: Arc>, } impl ProposalGenerator { @@ -311,7 +311,7 @@ impl ProposalGenerator { vtxn_config, allow_batches_without_pos_in_proposal, opt_qs_payload_param_provider, - last_proposed_proposal: Arc::new(Mutex::new(None)), + proposed_proposals: Arc::new(DashMap::new()), } } @@ -354,15 +354,18 @@ impl ProposalGenerator { wait_callback: BoxFuture<'static, ()>, is_opt: bool, ) -> anyhow::Result> { - if let Some((proposed_block, proposed_opt)) = &mut *self.last_proposed_proposal.lock() { - if round < proposed_block.round() { - return Err(format_err!( - "OldRound: requested round {} is less than last proposed round {}", - round, - proposed_block.round() - )); - } - if round == proposed_block.round() { + let latest_proposed_round = self.proposed_proposals.iter().map(|entry| *entry.key()).max(); + if round < latest_proposed_round.unwrap_or(0) { + bail!( + "Round {} is lower than the latest proposed round {}", + round, + latest_proposed_round.unwrap_or(0) + ); + } + self.proposed_proposals.retain(|r, _| *r < round); + match self.proposed_proposals.entry(round) { + dashmap::mapref::entry::Entry::Occupied(mut entry) => { + let (proposed_block, proposed_opt) = entry.get(); if is_opt || !*proposed_opt { return Ok(None); } @@ -396,179 +399,179 @@ impl ProposalGenerator { hqc, ) }; - *proposed_block = block_data.clone(); - *proposed_opt = is_opt; + entry.insert((block_data.clone(), is_opt)); - return Ok(Some(block_data)); + Ok(Some(block_data)) } - }; + dashmap::mapref::entry::Entry::Vacant(entry) => { + let maybe_optqs_payload_pull_params = self.opt_qs_payload_param_provider.get_params(); + + let hqc = self.ensure_highest_quorum_cert(round)?; + + let (validator_txns, payload, timestamp) = if hqc.certified_block().has_reconfiguration() { + // Reconfiguration rule - we propose empty blocks with parents' timestamp + // after reconfiguration until it's committed + ( + vec![], + Payload::empty( + self.quorum_store_enabled, + self.allow_batches_without_pos_in_proposal, + ), + hqc.certified_block().timestamp_usecs(), + ) + } else { + // One needs to hold the blocks with the references to the payloads while get_block is + // being executed: pending blocks vector keeps all the pending ancestors of the extended branch. + let mut pending_blocks = self + .block_store + .path_from_commit_root(hqc.certified_block().id()) + .ok_or_else(|| format_err!("HQC {} already pruned", hqc.certified_block().id()))?; + // Avoid txn manager long poll if the root block has txns, so that the leader can + // deliver the commit proof to others without delay. + pending_blocks.push(self.block_store.commit_root()); + + // Exclude all the pending transactions: these are all the ancestors of + // parent (including) up to the root (including). + let exclude_payload: Vec<_> = pending_blocks + .iter() + .flat_map(|block| block.payload()) + .collect(); + let payload_filter = PayloadFilter::from(&exclude_payload); + + let pending_ordering = self + .block_store + .path_from_ordered_root(hqc.certified_block().id()) + .ok_or_else(|| format_err!("HQC {} already pruned", hqc.certified_block().id()))? + .iter() + .any(|block| !block.payload().map_or(true, |txns| txns.is_empty())); - let maybe_optqs_payload_pull_params = self.opt_qs_payload_param_provider.get_params(); + // All proposed blocks in a branch are guaranteed to have increasing timestamps + // since their predecessor block will not be added to the BlockStore until + // the local time exceeds it. + let timestamp = self.time_service.get_current_timestamp(); - let hqc = self.ensure_highest_quorum_cert(round)?; + let voting_power_ratio = proposer_election.get_voting_power_participation_ratio(round); - let (validator_txns, payload, timestamp) = if hqc.certified_block().has_reconfiguration() { - // Reconfiguration rule - we propose empty blocks with parents' timestamp - // after reconfiguration until it's committed - ( - vec![], - Payload::empty( - self.quorum_store_enabled, - self.allow_batches_without_pos_in_proposal, - ), - hqc.certified_block().timestamp_usecs(), - ) - } else { - // One needs to hold the blocks with the references to the payloads while get_block is - // being executed: pending blocks vector keeps all the pending ancestors of the extended branch. - let mut pending_blocks = self - .block_store - .path_from_commit_root(hqc.certified_block().id()) - .ok_or_else(|| format_err!("HQC {} already pruned", hqc.certified_block().id()))?; - // Avoid txn manager long poll if the root block has txns, so that the leader can - // deliver the commit proof to others without delay. - pending_blocks.push(self.block_store.commit_root()); - - // Exclude all the pending transactions: these are all the ancestors of - // parent (including) up to the root (including). - let exclude_payload: Vec<_> = pending_blocks - .iter() - .flat_map(|block| block.payload()) - .collect(); - let payload_filter = PayloadFilter::from(&exclude_payload); - - let pending_ordering = self - .block_store - .path_from_ordered_root(hqc.certified_block().id()) - .ok_or_else(|| format_err!("HQC {} already pruned", hqc.certified_block().id()))? - .iter() - .any(|block| !block.payload().map_or(true, |txns| txns.is_empty())); - - // All proposed blocks in a branch are guaranteed to have increasing timestamps - // since their predecessor block will not be added to the BlockStore until - // the local time exceeds it. - let timestamp = self.time_service.get_current_timestamp(); - - let voting_power_ratio = proposer_election.get_voting_power_participation_ratio(round); - - let ( - max_block_txns, - max_block_txns_after_filtering, - max_txns_from_block_to_execute, - proposal_delay, - ) = self - .calculate_max_block_sizes(voting_power_ratio, timestamp, round) - .await; - - PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING.observe(max_block_txns_after_filtering as f64); - if let Some(max_to_execute) = max_txns_from_block_to_execute { - PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE.observe(max_to_execute as f64); - } + let ( + max_block_txns, + max_block_txns_after_filtering, + max_txns_from_block_to_execute, + proposal_delay, + ) = self + .calculate_max_block_sizes(voting_power_ratio, timestamp, round) + .await; + + PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING.observe(max_block_txns_after_filtering as f64); + if let Some(max_to_execute) = max_txns_from_block_to_execute { + PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE.observe(max_to_execute as f64); + } - PROPOSER_DELAY_PROPOSAL.observe(proposal_delay.as_secs_f64()); - if !proposal_delay.is_zero() { - tokio::time::sleep(proposal_delay).await; - } + PROPOSER_DELAY_PROPOSAL.observe(proposal_delay.as_secs_f64()); + if !proposal_delay.is_zero() { + tokio::time::sleep(proposal_delay).await; + } - let max_pending_block_size = pending_blocks - .iter() - .map(|block| { - block.payload().map_or(PayloadTxnsSize::zero(), |p| { - PayloadTxnsSize::new(p.len() as u64, p.size() as u64) - }) - }) - .reduce(PayloadTxnsSize::maximum) - .unwrap_or_default(); - // Use non-backpressure reduced values for computing fill_fraction - let max_fill_fraction = - (max_pending_block_size.count() as f32 / self.max_block_txns.count() as f32).max( - max_pending_block_size.size_in_bytes() as f32 - / self.max_block_txns.size_in_bytes() as f32, - ); - PROPOSER_PENDING_BLOCKS_COUNT.set(pending_blocks.len() as i64); - PROPOSER_PENDING_BLOCKS_FILL_FRACTION.set(max_fill_fraction as f64); + let max_pending_block_size = pending_blocks + .iter() + .map(|block| { + block.payload().map_or(PayloadTxnsSize::zero(), |p| { + PayloadTxnsSize::new(p.len() as u64, p.size() as u64) + }) + }) + .reduce(PayloadTxnsSize::maximum) + .unwrap_or_default(); + // Use non-backpressure reduced values for computing fill_fraction + let max_fill_fraction = + (max_pending_block_size.count() as f32 / self.max_block_txns.count() as f32).max( + max_pending_block_size.size_in_bytes() as f32 + / self.max_block_txns.size_in_bytes() as f32, + ); + PROPOSER_PENDING_BLOCKS_COUNT.set(pending_blocks.len() as i64); + PROPOSER_PENDING_BLOCKS_FILL_FRACTION.set(max_fill_fraction as f64); + + let pending_validator_txn_hashes: HashSet = pending_blocks + .iter() + .filter_map(|block| block.validator_txns()) + .flatten() + .map(ValidatorTransaction::hash) + .collect(); + let validator_txn_filter = + vtxn_pool::TransactionFilter::PendingTxnHashSet(pending_validator_txn_hashes); + + let (validator_txns, mut payload) = self + .payload_client + .pull_payload( + PayloadPullParameters { + max_poll_time: self.quorum_store_poll_time.saturating_sub(proposal_delay), + max_txns: max_block_txns, + max_txns_after_filtering: max_block_txns_after_filtering, + soft_max_txns_after_filtering: max_txns_from_block_to_execute + .unwrap_or(max_block_txns_after_filtering), + max_inline_txns: self.max_inline_txns, + maybe_optqs_payload_pull_params, + user_txn_filter: payload_filter, + pending_ordering, + pending_uncommitted_blocks: pending_blocks.len(), + recent_max_fill_fraction: max_fill_fraction, + block_timestamp: timestamp, + }, + validator_txn_filter, + wait_callback, + ) + .await + .context("Fail to retrieve payload")?; - let pending_validator_txn_hashes: HashSet = pending_blocks - .iter() - .filter_map(|block| block.validator_txns()) - .flatten() - .map(ValidatorTransaction::hash) - .collect(); - let validator_txn_filter = - vtxn_pool::TransactionFilter::PendingTxnHashSet(pending_validator_txn_hashes); - - let (validator_txns, mut payload) = self - .payload_client - .pull_payload( - PayloadPullParameters { - max_poll_time: self.quorum_store_poll_time.saturating_sub(proposal_delay), - max_txns: max_block_txns, - max_txns_after_filtering: max_block_txns_after_filtering, - soft_max_txns_after_filtering: max_txns_from_block_to_execute - .unwrap_or(max_block_txns_after_filtering), - max_inline_txns: self.max_inline_txns, - maybe_optqs_payload_pull_params, - user_txn_filter: payload_filter, - pending_ordering, - pending_uncommitted_blocks: pending_blocks.len(), - recent_max_fill_fraction: max_fill_fraction, - block_timestamp: timestamp, - }, - validator_txn_filter, - wait_callback, - ) - .await - .context("Fail to retrieve payload")?; - - if !payload.is_direct() - && max_txns_from_block_to_execute.is_some() - && max_txns_from_block_to_execute.map_or(false, |v| payload.len() as u64 > v) - { - payload = payload.transform_to_quorum_store_v2(max_txns_from_block_to_execute); - } - (validator_txns, payload, timestamp.as_micros() as u64) - }; + if !payload.is_direct() + && max_txns_from_block_to_execute.is_some() + && max_txns_from_block_to_execute.map_or(false, |v| payload.len() as u64 > v) + { + payload = payload.transform_to_quorum_store_v2(max_txns_from_block_to_execute); + } + (validator_txns, payload, timestamp.as_micros() as u64) + }; - let (quorum_cert, failed_authors) = match is_opt { - true => (QuorumCert::empty(), vec![]), - false => { - let hqc = hqc.as_ref().clone(); - let failed_authors = self.compute_failed_authors( - round, - hqc.certified_block().round(), - false, - proposer_election, - ); - (hqc, failed_authors) - }, - }; + let (quorum_cert, failed_authors) = match is_opt { + true => (QuorumCert::empty(), vec![]), + false => { + let hqc = hqc.as_ref().clone(); + let failed_authors = self.compute_failed_authors( + round, + hqc.certified_block().round(), + false, + proposer_election, + ); + (hqc, failed_authors) + }, + }; - let block = if self.vtxn_config.enabled() { - BlockData::new_proposal_ext( - validator_txns, - payload, - self.author, - failed_authors, - epoch, - round, - timestamp, - quorum_cert, - ) - } else { - BlockData::new_proposal( - payload, - self.author, - failed_authors, - epoch, - round, - timestamp, - quorum_cert, - ) - }; + let block = if self.vtxn_config.enabled() { + BlockData::new_proposal_ext( + validator_txns, + payload, + self.author, + failed_authors, + epoch, + round, + timestamp, + quorum_cert, + ) + } else { + BlockData::new_proposal( + payload, + self.author, + failed_authors, + epoch, + round, + timestamp, + quorum_cert, + ) + }; - self.last_proposed_proposal.lock().replace((block.clone(), is_opt)); + entry.insert((block.clone(), is_opt)); - Ok(Some(block)) + Ok(Some(block)) + }, + } } async fn calculate_max_block_sizes( diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index b97d4bacb7aab..d3a89d6d1d844 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -15,7 +15,7 @@ use crate::{ proposal_generator::ProposalGenerator, proposal_status_tracker::TPastProposalStatusTracker, proposer_election::ProposerElection, - round_state::{NewRoundEvent, NewRoundReason, RoundState, RoundStateLogSchema}, + round_state::{self, NewRoundEvent, NewRoundReason, RoundState, RoundStateLogSchema}, unequivocal_proposer_election::UnequivocalProposerElection, }, logging::{LogEvent, LogSchema},