Skip to content

Commit

Permalink
lock
Browse files Browse the repository at this point in the history
  • Loading branch information
danielxiangzl committed Nov 23, 2024
1 parent f6ecbe8 commit 85d9601
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 173 deletions.
347 changes: 175 additions & 172 deletions consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ pub struct ProposalGenerator {
allow_batches_without_pos_in_proposal: bool,
opt_qs_payload_param_provider: Arc<dyn TOptQSPullParamsProvider>,

last_proposed_proposal: Arc<Mutex<Option<(BlockData, bool)>>>,
proposed_proposals: Arc<DashMap<Round, (BlockData, bool)>>,
}

impl ProposalGenerator {
Expand Down Expand Up @@ -311,7 +311,7 @@ impl ProposalGenerator {
vtxn_config,
allow_batches_without_pos_in_proposal,
opt_qs_payload_param_provider,
last_proposed_proposal: Arc::new(Mutex::new(None)),
proposed_proposals: Arc::new(DashMap::new()),
}
}

Expand Down Expand Up @@ -354,15 +354,18 @@ impl ProposalGenerator {
wait_callback: BoxFuture<'static, ()>,
is_opt: bool,
) -> anyhow::Result<Option<BlockData>> {
if let Some((proposed_block, proposed_opt)) = &mut *self.last_proposed_proposal.lock() {
if round < proposed_block.round() {
return Err(format_err!(
"OldRound: requested round {} is less than last proposed round {}",
round,
proposed_block.round()
));
}
if round == proposed_block.round() {
let latest_proposed_round = self.proposed_proposals.iter().map(|entry| *entry.key()).max();
if round < latest_proposed_round.unwrap_or(0) {
bail!(
"Round {} is lower than the latest proposed round {}",
round,
latest_proposed_round.unwrap_or(0)
);
}
self.proposed_proposals.retain(|r, _| *r < round);
match self.proposed_proposals.entry(round) {
dashmap::mapref::entry::Entry::Occupied(mut entry) => {
let (proposed_block, proposed_opt) = entry.get();
if is_opt || !*proposed_opt {
return Ok(None);
}
Expand Down Expand Up @@ -396,179 +399,179 @@ impl ProposalGenerator {
hqc,
)
};
*proposed_block = block_data.clone();
*proposed_opt = is_opt;
entry.insert((block_data.clone(), is_opt));

return Ok(Some(block_data));
Ok(Some(block_data))
}
};
dashmap::mapref::entry::Entry::Vacant(entry) => {
let maybe_optqs_payload_pull_params = self.opt_qs_payload_param_provider.get_params();

let hqc = self.ensure_highest_quorum_cert(round)?;

let (validator_txns, payload, timestamp) = if hqc.certified_block().has_reconfiguration() {
// Reconfiguration rule - we propose empty blocks with parents' timestamp
// after reconfiguration until it's committed
(
vec![],
Payload::empty(
self.quorum_store_enabled,
self.allow_batches_without_pos_in_proposal,
),
hqc.certified_block().timestamp_usecs(),
)
} else {
// One needs to hold the blocks with the references to the payloads while get_block is
// being executed: pending blocks vector keeps all the pending ancestors of the extended branch.
let mut pending_blocks = self
.block_store
.path_from_commit_root(hqc.certified_block().id())
.ok_or_else(|| format_err!("HQC {} already pruned", hqc.certified_block().id()))?;
// Avoid txn manager long poll if the root block has txns, so that the leader can
// deliver the commit proof to others without delay.
pending_blocks.push(self.block_store.commit_root());

// Exclude all the pending transactions: these are all the ancestors of
// parent (including) up to the root (including).
let exclude_payload: Vec<_> = pending_blocks
.iter()
.flat_map(|block| block.payload())
.collect();
let payload_filter = PayloadFilter::from(&exclude_payload);

let pending_ordering = self
.block_store
.path_from_ordered_root(hqc.certified_block().id())
.ok_or_else(|| format_err!("HQC {} already pruned", hqc.certified_block().id()))?
.iter()
.any(|block| !block.payload().map_or(true, |txns| txns.is_empty()));

let maybe_optqs_payload_pull_params = self.opt_qs_payload_param_provider.get_params();
// All proposed blocks in a branch are guaranteed to have increasing timestamps
// since their predecessor block will not be added to the BlockStore until
// the local time exceeds it.
let timestamp = self.time_service.get_current_timestamp();

let hqc = self.ensure_highest_quorum_cert(round)?;
let voting_power_ratio = proposer_election.get_voting_power_participation_ratio(round);

let (validator_txns, payload, timestamp) = if hqc.certified_block().has_reconfiguration() {
// Reconfiguration rule - we propose empty blocks with parents' timestamp
// after reconfiguration until it's committed
(
vec![],
Payload::empty(
self.quorum_store_enabled,
self.allow_batches_without_pos_in_proposal,
),
hqc.certified_block().timestamp_usecs(),
)
} else {
// One needs to hold the blocks with the references to the payloads while get_block is
// being executed: pending blocks vector keeps all the pending ancestors of the extended branch.
let mut pending_blocks = self
.block_store
.path_from_commit_root(hqc.certified_block().id())
.ok_or_else(|| format_err!("HQC {} already pruned", hqc.certified_block().id()))?;
// Avoid txn manager long poll if the root block has txns, so that the leader can
// deliver the commit proof to others without delay.
pending_blocks.push(self.block_store.commit_root());

// Exclude all the pending transactions: these are all the ancestors of
// parent (including) up to the root (including).
let exclude_payload: Vec<_> = pending_blocks
.iter()
.flat_map(|block| block.payload())
.collect();
let payload_filter = PayloadFilter::from(&exclude_payload);

let pending_ordering = self
.block_store
.path_from_ordered_root(hqc.certified_block().id())
.ok_or_else(|| format_err!("HQC {} already pruned", hqc.certified_block().id()))?
.iter()
.any(|block| !block.payload().map_or(true, |txns| txns.is_empty()));

// All proposed blocks in a branch are guaranteed to have increasing timestamps
// since their predecessor block will not be added to the BlockStore until
// the local time exceeds it.
let timestamp = self.time_service.get_current_timestamp();

let voting_power_ratio = proposer_election.get_voting_power_participation_ratio(round);

let (
max_block_txns,
max_block_txns_after_filtering,
max_txns_from_block_to_execute,
proposal_delay,
) = self
.calculate_max_block_sizes(voting_power_ratio, timestamp, round)
.await;

PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING.observe(max_block_txns_after_filtering as f64);
if let Some(max_to_execute) = max_txns_from_block_to_execute {
PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE.observe(max_to_execute as f64);
}
let (
max_block_txns,
max_block_txns_after_filtering,
max_txns_from_block_to_execute,
proposal_delay,
) = self
.calculate_max_block_sizes(voting_power_ratio, timestamp, round)
.await;

PROPOSER_MAX_BLOCK_TXNS_AFTER_FILTERING.observe(max_block_txns_after_filtering as f64);
if let Some(max_to_execute) = max_txns_from_block_to_execute {
PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE.observe(max_to_execute as f64);
}

PROPOSER_DELAY_PROPOSAL.observe(proposal_delay.as_secs_f64());
if !proposal_delay.is_zero() {
tokio::time::sleep(proposal_delay).await;
}
PROPOSER_DELAY_PROPOSAL.observe(proposal_delay.as_secs_f64());
if !proposal_delay.is_zero() {
tokio::time::sleep(proposal_delay).await;
}

let max_pending_block_size = pending_blocks
.iter()
.map(|block| {
block.payload().map_or(PayloadTxnsSize::zero(), |p| {
PayloadTxnsSize::new(p.len() as u64, p.size() as u64)
})
})
.reduce(PayloadTxnsSize::maximum)
.unwrap_or_default();
// Use non-backpressure reduced values for computing fill_fraction
let max_fill_fraction =
(max_pending_block_size.count() as f32 / self.max_block_txns.count() as f32).max(
max_pending_block_size.size_in_bytes() as f32
/ self.max_block_txns.size_in_bytes() as f32,
);
PROPOSER_PENDING_BLOCKS_COUNT.set(pending_blocks.len() as i64);
PROPOSER_PENDING_BLOCKS_FILL_FRACTION.set(max_fill_fraction as f64);
let max_pending_block_size = pending_blocks
.iter()
.map(|block| {
block.payload().map_or(PayloadTxnsSize::zero(), |p| {
PayloadTxnsSize::new(p.len() as u64, p.size() as u64)
})
})
.reduce(PayloadTxnsSize::maximum)
.unwrap_or_default();
// Use non-backpressure reduced values for computing fill_fraction
let max_fill_fraction =
(max_pending_block_size.count() as f32 / self.max_block_txns.count() as f32).max(
max_pending_block_size.size_in_bytes() as f32
/ self.max_block_txns.size_in_bytes() as f32,
);
PROPOSER_PENDING_BLOCKS_COUNT.set(pending_blocks.len() as i64);
PROPOSER_PENDING_BLOCKS_FILL_FRACTION.set(max_fill_fraction as f64);

let pending_validator_txn_hashes: HashSet<HashValue> = pending_blocks
.iter()
.filter_map(|block| block.validator_txns())
.flatten()
.map(ValidatorTransaction::hash)
.collect();
let validator_txn_filter =
vtxn_pool::TransactionFilter::PendingTxnHashSet(pending_validator_txn_hashes);

let (validator_txns, mut payload) = self
.payload_client
.pull_payload(
PayloadPullParameters {
max_poll_time: self.quorum_store_poll_time.saturating_sub(proposal_delay),
max_txns: max_block_txns,
max_txns_after_filtering: max_block_txns_after_filtering,
soft_max_txns_after_filtering: max_txns_from_block_to_execute
.unwrap_or(max_block_txns_after_filtering),
max_inline_txns: self.max_inline_txns,
maybe_optqs_payload_pull_params,
user_txn_filter: payload_filter,
pending_ordering,
pending_uncommitted_blocks: pending_blocks.len(),
recent_max_fill_fraction: max_fill_fraction,
block_timestamp: timestamp,
},
validator_txn_filter,
wait_callback,
)
.await
.context("Fail to retrieve payload")?;

let pending_validator_txn_hashes: HashSet<HashValue> = pending_blocks
.iter()
.filter_map(|block| block.validator_txns())
.flatten()
.map(ValidatorTransaction::hash)
.collect();
let validator_txn_filter =
vtxn_pool::TransactionFilter::PendingTxnHashSet(pending_validator_txn_hashes);

let (validator_txns, mut payload) = self
.payload_client
.pull_payload(
PayloadPullParameters {
max_poll_time: self.quorum_store_poll_time.saturating_sub(proposal_delay),
max_txns: max_block_txns,
max_txns_after_filtering: max_block_txns_after_filtering,
soft_max_txns_after_filtering: max_txns_from_block_to_execute
.unwrap_or(max_block_txns_after_filtering),
max_inline_txns: self.max_inline_txns,
maybe_optqs_payload_pull_params,
user_txn_filter: payload_filter,
pending_ordering,
pending_uncommitted_blocks: pending_blocks.len(),
recent_max_fill_fraction: max_fill_fraction,
block_timestamp: timestamp,
},
validator_txn_filter,
wait_callback,
)
.await
.context("Fail to retrieve payload")?;

if !payload.is_direct()
&& max_txns_from_block_to_execute.is_some()
&& max_txns_from_block_to_execute.map_or(false, |v| payload.len() as u64 > v)
{
payload = payload.transform_to_quorum_store_v2(max_txns_from_block_to_execute);
}
(validator_txns, payload, timestamp.as_micros() as u64)
};
if !payload.is_direct()
&& max_txns_from_block_to_execute.is_some()
&& max_txns_from_block_to_execute.map_or(false, |v| payload.len() as u64 > v)
{
payload = payload.transform_to_quorum_store_v2(max_txns_from_block_to_execute);
}
(validator_txns, payload, timestamp.as_micros() as u64)
};

let (quorum_cert, failed_authors) = match is_opt {
true => (QuorumCert::empty(), vec![]),
false => {
let hqc = hqc.as_ref().clone();
let failed_authors = self.compute_failed_authors(
round,
hqc.certified_block().round(),
false,
proposer_election,
);
(hqc, failed_authors)
},
};
let (quorum_cert, failed_authors) = match is_opt {
true => (QuorumCert::empty(), vec![]),
false => {
let hqc = hqc.as_ref().clone();
let failed_authors = self.compute_failed_authors(
round,
hqc.certified_block().round(),
false,
proposer_election,
);
(hqc, failed_authors)
},
};

let block = if self.vtxn_config.enabled() {
BlockData::new_proposal_ext(
validator_txns,
payload,
self.author,
failed_authors,
epoch,
round,
timestamp,
quorum_cert,
)
} else {
BlockData::new_proposal(
payload,
self.author,
failed_authors,
epoch,
round,
timestamp,
quorum_cert,
)
};
let block = if self.vtxn_config.enabled() {
BlockData::new_proposal_ext(
validator_txns,
payload,
self.author,
failed_authors,
epoch,
round,
timestamp,
quorum_cert,
)
} else {
BlockData::new_proposal(
payload,
self.author,
failed_authors,
epoch,
round,
timestamp,
quorum_cert,
)
};

self.last_proposed_proposal.lock().replace((block.clone(), is_opt));
entry.insert((block.clone(), is_opt));

Ok(Some(block))
Ok(Some(block))
},
}
}

async fn calculate_max_block_sizes(
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
proposal_generator::ProposalGenerator,
proposal_status_tracker::TPastProposalStatusTracker,
proposer_election::ProposerElection,
round_state::{NewRoundEvent, NewRoundReason, RoundState, RoundStateLogSchema},
round_state::{self, NewRoundEvent, NewRoundReason, RoundState, RoundStateLogSchema},
unequivocal_proposer_election::UnequivocalProposerElection,
},
logging::{LogEvent, LogSchema},
Expand Down

0 comments on commit 85d9601

Please sign in to comment.