Skip to content

Commit

Permalink
node: add incremental RollingFinality
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Jun 27, 2024
1 parent 7d30b9f commit f073106
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 51 deletions.
193 changes: 145 additions & 48 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use crate::database::{self, Candidate, Ledger, Mempool, Metadata};
use crate::{vm, Message, Network};
use anyhow::{anyhow, Result};
use dusk_consensus::commons::{ConsensusError, TimeoutSet};
use dusk_consensus::config::{
CONSENSUS_ROLLING_FINALITY_THRESHOLD, MAX_STEP_TIMEOUT, MIN_STEP_TIMEOUT,
};
use dusk_consensus::config::{MAX_STEP_TIMEOUT, MIN_STEP_TIMEOUT};
use dusk_consensus::user::provisioners::{ContextProvisioners, Provisioners};
use node_data::bls::PublicKey;
use node_data::ledger::{
Expand All @@ -23,6 +21,7 @@ use execution_core::stake::Unstake;
use metrics::{counter, gauge, histogram};
use node_data::message::payload::Vote;
use node_data::{Serializable, StepName};
use std::collections::BTreeMap;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use tokio::sync::RwLock;
Expand All @@ -42,6 +41,8 @@ const CANDIDATES_DELETION_OFFSET: u64 = 10;
/// future message.
const OFFSET_FUTURE_MSGS: u64 = 5;

pub type RollingFinalityResult = ([u8; 32], BTreeMap<u64, [u8; 32]>);

#[allow(dead_code)]
pub(crate) enum RevertTarget {
Commit([u8; 32]),
Expand Down Expand Up @@ -396,11 +397,12 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
}
}

/// Return true if the accepted blocks triggered a rolling finality
pub(crate) async fn try_accept_block(
&mut self,
blk: &Block,
enable_consensus: bool,
) -> anyhow::Result<Label> {
) -> anyhow::Result<bool> {
let mut task = self.task.write().await;

let mut tip = self.tip.write().await;
Expand All @@ -427,27 +429,26 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
let mut block_size_on_disk = 0;
let mut slashed_count: usize = 0;
// Persist block in consistency with the VM state update
let label = {
let (label, finalized) = {
let header = blk.header();

let vm = self.vm.write().await;
let (txs, label) = self.db.read().await.update(|db| {
let (txs, rolling_result) = self.db.read().await.update(|db| {
let (txs, verification_output) = vm.accept(blk)?;

est_elapsed_time = start.elapsed();

assert_eq!(header.state_hash, verification_output.state_root);
assert_eq!(header.event_hash, verification_output.event_hash);

let tip_is_final = tip.is_final();

let label =
self.rolling_finality::<DB>(pni, tip_is_final, blk, db)?;
let rolling_results =
self.rolling_finality::<DB>(pni, blk, db)?;

let label = rolling_results.0;
// Store block with updated transactions with Error and GasSpent
block_size_on_disk = db.store_block(header, &txs, label)?;

Ok((txs, label))
Ok((txs, rolling_results))
})?;

self.log_missing_iterations(
Expand All @@ -473,14 +474,23 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
provisioners_list.update_and_swap(new_prov)
}

let (label, final_results) = rolling_result;
// Update tip
*tip = BlockWithLabel::new_with_label(blk.clone(), label);

if tip.is_final() {
vm.finalize_state(tip.inner().header().state_hash)?;
let finalized = final_results.is_some();

if let Some((prev_final_state, mut new_finals)) = final_results {
let (_, new_final_state) =
new_finals.pop_last().expect("new_finals to be not empty");
let states_to_forget = new_finals
.into_values()
.chain([prev_final_state])
.collect::<Vec<_>>();
vm.finalize_state(new_final_state, states_to_forget)?;
}

anyhow::Ok(label)
anyhow::Ok((label, finalized))
}?;

// Abort consensus.
Expand Down Expand Up @@ -570,50 +580,137 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
);
}

Ok(label)
Ok(finalized)
}

/// Perform the rolling finality checks, updating the database with new
/// labels if required
///
/// Returns
/// - Current accepted block label
/// - Previous last finalized state root
/// - List of the new finalized state root
fn rolling_finality<D: database::DB>(
&self,
pni: u8,
tip_is_final: bool,
blk: &Block,
db: &D::P<'_>,
) -> Result<Label, anyhow::Error> {
let attested = pni == 0;
let label = match (attested, tip_is_final) {
(true, true) => Label::Final,
(false, _) => Label::Accepted,
(true, _) => {
let current = blk.header().height;
let target = current
.checked_sub(CONSENSUS_ROLLING_FINALITY_THRESHOLD)
.unwrap_or_default();
for h in (target..current).rev() {
match db.fetch_block_label_by_height(h)? {
None => {
panic!("Cannot find block label for height: {h}")
}
Some((_, Label::Final)) => {
warn!("Found Attested block following a Final one");
break;
}
Some((_, Label::Accepted)) => {
return Ok(Label::Attested)
}
Some((_, Label::Attested)) => {} // just continue scan
};
) -> Result<(Label, Option<RollingFinalityResult>)> {
let confirmed_after = match pni {
0 => 1u64,
n => 2 * n as u64,
};
let block_label = if pni == 0 {
Label::Attested(confirmed_after)
} else {
return Ok((Label::Accepted(confirmed_after), None));
};
let mut finalized_blocks = BTreeMap::new();

let current_height = blk.header().height;
let mut labels = BTreeMap::new();

// Retrieve latest blocks up to the Last Finalized Block
let mut lfb_hash = None;
for height in (0..current_height).rev() {
let (hash, label) = db.fetch_block_label_by_height(height)?.ok_or(
anyhow!("Cannot find block label for height {height}"),
)?;
if let Label::Final(_) = label {
lfb_hash = Some(hash);
break;
}
labels.insert(height, (hash, label));
}
let lfb_hash =
lfb_hash.expect("Unable to find last finalized block hash");
let lfb_state_root = db
.fetch_block_header(&lfb_hash)?
.ok_or(anyhow!(
"Cannot get header for last finalized block hash {}",
to_str(&lfb_hash)
))?
.0
.state_hash;

// A block is considered stable when is either Confirmed or Attested
// We start with `stable_count=1` because we are sure to be processing
// an Attested block
let mut stable_count = 1;

// Iterate from TIP to LFB to set Label::Confirmed
for (&height, (hash, label)) in labels.iter_mut().rev() {
match label {
Label::Accepted(ref confirmed_after)
| Label::Attested(ref confirmed_after) => {
if &stable_count >= confirmed_after {
info!(
event = "block confirmed",
src = "rolling_finality",
current_height,
height,
confirmed_after,
hash = to_str(hash),
?label,
);
*label = Label::Confirmed(current_height - height);
db.store_block_label(height, hash, *label)?;
stable_count += 1;
} else {
break;
}
}
Label::Confirmed(_) => {
stable_count += 1;
continue;
}
Label::Final(_) => {
warn!("Found a final block during rolling finality scan. This should be a bug");
break;
}
info!(
event = "rolling finality",
height = blk.header().height,
hash = to_str(&blk.header().hash),
state_hash = to_str(&blk.header().state_hash),
);
Label::Final
}
}

// Iterate from LFB to tip to set Label::Final
for (height, (hash, mut label)) in labels.into_iter() {
match label {
Label::Final(_) => {
warn!("Found a final block during rolling finality. This should be a bug")
}
Label::Accepted(_) | Label::Attested(_) => break,
Label::Confirmed(_) => {
let finalized_after = current_height - height;
label = Label::Final(finalized_after);
db.store_block_label(height, &hash, label)?;

let state_hash = db
.fetch_block_header(&hash)?
.map(|(h, _)| h.state_hash)
.ok_or(anyhow!(
"Cannot get header for hash {}",
to_str(&hash)
))?;
info!(
event = "block finalized",
src = "rolling_finality",
current_height,
height,
finalized_after,
hash = to_str(&hash),
state_root = to_str(&state_hash),
);
finalized_blocks.insert(height, state_hash);
}
}
}

let finalized_result = if finalized_blocks.is_empty() {
None
} else {
Some((lfb_state_root, finalized_blocks))
};
Ok(label)

Ok((block_label, finalized_result))
}

/// Implements the algorithm of full revert to any of supported targets.
Expand Down
6 changes: 3 additions & 3 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{vm, Network};

use crate::database::{Candidate, Ledger};
use metrics::counter;
use node_data::ledger::{to_str, Attestation, Block, Label};
use node_data::ledger::{to_str, Attestation, Block};
use node_data::message::payload::{
GetBlocks, GetResource, Inv, RatificationResult, Vote,
};
Expand Down Expand Up @@ -589,11 +589,11 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {

// Try accepting consecutive block
if remote_height == local_header.height + 1 {
let label = acc.try_accept_block(remote_blk, true).await?;
let finalized = acc.try_accept_block(remote_blk, true).await?;

// On first final block accepted while we're inSync, clear
// blacklisted blocks
if let Label::Final = label {
if finalized {
self.blacklisted_blocks.write().await.clear();
}

Expand Down

0 comments on commit f073106

Please sign in to comment.