Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: Allow fallback to lower-round blocks #1258

Merged
merged 5 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,11 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
self.mrb.read().await.inner().header().height
}

/// Returns chain tip header
pub(crate) async fn header(&self) -> ledger::Header {
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
self.mrb.read().await.inner().header().clone()
}

pub(crate) async fn get_curr_hash(&self) -> [u8; 32] {
self.mrb.read().await.inner().header().hash
}
Expand Down
130 changes: 76 additions & 54 deletions node/src/chain/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use anyhow::{anyhow, Result};
use node_data::ledger::Block;
use dusk_consensus::user::provisioners::ContextProvisioners;
use node_data::ledger;
use node_data::ledger::Header;
use std::cmp::Ordering;
use tracing::info;

use crate::{
Expand Down Expand Up @@ -33,67 +36,86 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution>
Self { acc }
}

pub(crate) async fn try_execute_fallback(&self, blk: &Block) -> Result<()> {
self.sanity_checks(blk).await?;
self.acc.try_revert(RevertTarget::LastFinalizedState).await
/// Makes an attempt to revert to the specified Target, if remote header is
/// fully valid
pub(crate) async fn try_revert(
&self,
local: &Header,
remote: &Header,
revert_target: RevertTarget,
) -> Result<()> {
self.verify_header(local, remote).await?;
self.acc.try_revert(revert_target).await
}

/// Performs a serias of checks to securely allow fallback execution.
async fn sanity_checks(&self, blk: &Block) -> Result<()> {
let acc = self.acc;

let curr_height = acc.get_curr_height().await;
let curr_iteration = acc.get_curr_iteration().await;

if curr_height < 1 {
return Err(anyhow!("cannot fallback over genesis block"));
}

if blk.header().iteration > curr_iteration {
return Err(anyhow!("iteration is higher than current"));
}

if blk.header().iteration == curr_iteration {
// This may happen only if:
//
// we have more than one winner blocks per a single iteration, same
// round.

// An invalid block was received.
return Err(anyhow!("iteration is equal to the current"));
}
/// Verifies if a block of header `local` can be replaced with a block with
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
/// header `remote`
async fn verify_header(
&self,
local: &Header,
remote: &Header,
) -> Result<()> {
match (local.height, remote.iteration.cmp(&local.iteration)) {
(0, _) => Err(anyhow!("cannot fallback over genesis block")),
(_, Ordering::Greater) => Err(anyhow!(
"iteration {:?} is higher than the current {:?}",
remote.iteration,
local.iteration
)),
(_, Ordering::Equal) => Err(anyhow!(
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
"iteration is equal to the current {:?}",
local.iteration
)),
_ => Ok(()),
}?;

let (prev_header, prev_prev_header) =
self.acc.db.read().await.view(|t| {
let (prev_block_header, _) = t
.fetch_block_header(&local.prev_block_hash)?
.expect("block must exist");

let (prev_prev_block_header, _) = t
.fetch_block_header(&prev_block_header.prev_block_hash)?
.expect("block must exist");

Ok::<(ledger::Header, ledger::Header), anyhow::Error>((
prev_block_header,
prev_prev_block_header,
))
})?;

info!(
event = "starting fallback",
height = curr_height,
iter = curr_iteration,
target_iter = blk.header().iteration,
event = "execute fallback checks",
height = local.height,
iter = local.iteration,
target_iter = remote.iteration,
);

let prev_block_height = curr_height - 1;
let prev_block = acc.db.read().await.view(|v| {
Ledger::fetch_block_by_height(&v, prev_block_height)?
.ok_or_else(|| anyhow::anyhow!("could not fetch block"))
})?;

info!(
event = "fallback checking block",
height = curr_height,
iter = curr_iteration,
target_iter = blk.header().iteration,
);

// Validate Header/Certificate of the new block upon previous block and
// provisioners.

// In an edge case, this may fail on performing fallback between two
// epochs.
let provisioners_list = acc.provisioners_list.read().await;
acceptor::verify_block_header(
let provisioners_list = self
.acc
.vm
.read()
.await
.get_provisioners(prev_header.state_hash)?;

let prev_provisioners_list = self
.acc
.vm
.read()
.await
.get_provisioners(prev_prev_header.state_hash)?;

let mut provisioners_list = ContextProvisioners::new(provisioners_list);
provisioners_list.set_previous(prev_provisioners_list);

// Ensure header of the new block is valid according to prev_block
// header
let _ = acceptor::verify_block_header(
self.acc.db.clone(),
prev_block.header(),
&prev_header,
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
&provisioners_list,
blk.header(),
remote,
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
)
.await?;

Expand Down
124 changes: 101 additions & 23 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use super::acceptor::Acceptor;
use super::acceptor::{Acceptor, RevertTarget};
use crate::chain::fallback;
use crate::database;
use crate::{vm, Network};

use crate::database::Ledger;
use node_data::ledger::{to_str, Block, Label};
use node_data::message::payload::{GetBlocks, Inv};
use node_data::message::Message;
Expand Down Expand Up @@ -263,35 +264,108 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {

async fn on_event(
&mut self,
blk: &Block,
remote_blk: &Block,
msg: &Message,
) -> anyhow::Result<Option<(Block, SocketAddr)>> {
let mut acc = self.acc.write().await;
let height = blk.header().height;
let tip_height = acc.get_curr_height().await;
let iter = acc.get_curr_iteration().await;
let curr_hash = acc.get_curr_hash().await;
let local_header = acc.header().await;
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
let remote_height = remote_blk.header().height;

if remote_height < local_header.height {
// Ensure that the block does not exist in the local state
let exists = acc
.db
.read()
.await
.view(|t| t.get_block_exists(&remote_blk.header().hash))?;

if exists {
// Already exists in local state
return Ok(None);
}

// Ensure that the block height is higher than the last finalized
// TODO: Retrieve the block from memory
if remote_height
<= acc.get_latest_final_block().await?.header().height
{
return Ok(None);
}

// If our local chain has a block L_B with ConsensusState not Final,
// and we receive a block N_B such that:
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
//
// N_B.PrevBlock == L_B.PrevBlock
// N_B.Iteration < L_B.Iteration
//
// Then we fallback to N_B.PrevBlock and accept N_B
let header = acc.db.read().await.view(|t| {
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
if let Some((prev_header, _)) =
t.fetch_block_header(&remote_blk.header().prev_block_hash)?
{
let l_b_height = prev_header.height + 1;
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
if let Some(l_b) = t.fetch_block_by_height(l_b_height)? {
if remote_blk.header().iteration
< l_b.header().iteration
{
return Ok(Some(l_b.header().clone()));
}
}
}

anyhow::Ok(None)
})?;

if let Some(header) = header {
match fallback::WithContext::new(acc.deref())
.try_revert(
&header,
remote_blk.header(),
RevertTarget::LastFinalizedState,
)
.await
{
Ok(_) => {
if remote_height == acc.get_curr_height().await + 1 {
acc.try_accept_block(remote_blk, true).await?;
return Ok(None);
}
}
Err(e) => {
error!(
event = "fallback failed",
height = local_header.height,
remote_height,
err = format!("{:?}", e)
);
return Ok(None);
}
}
}

if height < tip_height {
return Ok(None);
}

if height == tip_height {
if blk.header().hash == curr_hash {
if remote_height == local_header.height {
if remote_blk.header().hash == local_header.hash {
// Duplicated block.
// Node has already accepted it.
return Ok(None);
}

info!(
event = "entering fallback",
height = tip_height,
iter = iter,
new_iter = blk.header().iteration,
height = local_header.height,
iter = local_header.height,
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
new_iter = remote_blk.header().iteration,
);

match fallback::WithContext::new(acc.deref())
.try_execute_fallback(blk)
.try_revert(
&local_header,
remote_blk.header(),
RevertTarget::LastFinalizedState,
)
.await
{
Err(e) => {
Expand All @@ -306,23 +380,26 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {

// Blacklist the old-block hash so that if it's again
// sent then this node does not try to accept it.
self.blacklisted_blocks.write().await.insert(curr_hash);
self.blacklisted_blocks
.write()
.await
.insert(local_header.hash);

if height == acc.get_curr_height().await + 1 {
if remote_height == acc.get_curr_height().await + 1 {
// If we have fallback-ed to previous block only, then
// accepting the new block would be enough to continue
// in in_Sync mode instead of switching to Out-Of-Sync
// mode.

acc.try_accept_block(blk, true).await?;
acc.try_accept_block(remote_blk, true).await?;
return Ok(None);
}

// By switching to OutOfSync mode, we trigger the
// sync-up procedure to download all missing blocks from the
// main chain.
if let Some(metadata) = &msg.metadata {
let res = (blk.clone(), metadata.src_addr);
let res = (remote_blk.clone(), metadata.src_addr);
return Ok(Some(res));
} else {
return Ok(None);
Expand All @@ -332,8 +409,8 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
}

// Try accepting consecutive block
if height == tip_height + 1 {
let label = acc.try_accept_block(blk, true).await?;
if remote_height == local_header.height + 1 {
let label = acc.try_accept_block(remote_blk, true).await?;

// On first final block accepted while we're inSync, clear
// blacklisted blocks
Expand All @@ -346,7 +423,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
if let Some(metadata) = &msg.metadata {
if let Some(presync) = &mut self.presync {
if metadata.src_addr == presync.peer_addr
&& height == presync.start_height() + 1
&& remote_height == presync.start_height() + 1
{
let res =
(presync.target_blk.clone(), presync.peer_addr);
Expand All @@ -372,12 +449,13 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
if self.presync.is_none() {
self.presync = Some(PresyncInfo::new(
metadata.src_addr,
blk.clone(),
tip_height,
remote_blk.clone(),
local_header.height,
));
}

self.request_block(tip_height + 1, metadata.src_addr).await;
self.request_block(local_header.height + 1, metadata.src_addr)
.await;
}

Ok(None)
Expand Down