Skip to content

Commit

Permalink
Merge pull request #2216 from dusk-network/fix-1904
Browse files Browse the repository at this point in the history
node: Implement a FSM for stalled_chain states
  • Loading branch information
goshawk-3 authored Sep 10, 2024
2 parents d0e5cb8 + 107951d commit 75293b2
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 136 deletions.
35 changes: 5 additions & 30 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod genesis;

mod header_validation;
mod metrics;
mod stall_chain_fsm;

use self::acceptor::Acceptor;
use self::fsm::SimpleFSM;
Expand Down Expand Up @@ -44,7 +45,6 @@ const TOPICS: &[u8] = &[
Topics::Quorum as u8,
];

const ACCEPT_BLOCK_TIMEOUT_SEC: Duration = Duration::from_secs(20);
const HEARTBEAT_SEC: Duration = Duration::from_secs(1);

pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
Expand Down Expand Up @@ -110,14 +110,11 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
acc.write().await.spawn_task().await;

// Start-up FSM instance
let mut fsm = SimpleFSM::new(acc.clone(), network.clone());
let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await;

let outbound_chan = acc.read().await.get_outbound_chan().await;
let result_chan = acc.read().await.get_result_chan().await;

// Accept_Block timeout is activated when a node is unable to accept a
// valid block within a specified time frame.
let mut timeout = Self::next_timeout();
let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();

// Message loop for Chain context
Expand Down Expand Up @@ -156,10 +153,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
// By disabling block broadcast, a block may be received from a peer
// only after explicit request (on demand).
match fsm.on_block_event(*blk, msg.metadata).await {
Ok(None) => {}
Ok(Some(_)) => {
timeout = Self::next_timeout();
}
Ok(_) => {}
Err(err) => {
error!(event = "fsm::on_event failed", src = "wire", err = ?err);
}
Expand All @@ -181,11 +175,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
warn!("msg discarded: {e}");
}
match fsm.on_quorum_msg(payload, &msg).await {
Ok(None) => {}
Ok(Some(_)) => {
// block accepted, timeout reset
timeout = Self::next_timeout();
}
Ok(_) => {}
Err(err) => {
warn!(event = "quorum msg", ?err);
}
Expand All @@ -203,11 +193,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
// the winner block will be compiled and redirected to the Acceptor.
if let Payload::Quorum(quorum) = &msg.payload {
match fsm.on_quorum_msg(quorum, &msg).await {
Ok(None) => {}
Ok(Some(_)) => {
// block accepted, timeout reset
timeout = Self::next_timeout();
}
Ok(_) => {}
Err(err) => {
warn!(event = "handle quorum msg from internal consensus failed", ?err);
}
Expand All @@ -217,11 +203,6 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
if let Err(e) = network.read().await.broadcast(&msg).await {
warn!("Unable to re-route message {e}");
}
},
// Handles accept_block_timeout event
_ = sleep_until(timeout) => {
fsm.on_idle(ACCEPT_BLOCK_TIMEOUT_SEC).await;
timeout = Self::next_timeout();
},
// Handles heartbeat event
_ = sleep_until(heartbeat) => {
Expand Down Expand Up @@ -315,10 +296,4 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {

Ok(block)
}

fn next_timeout() -> Instant {
Instant::now()
.checked_add(ACCEPT_BLOCK_TIMEOUT_SEC)
.unwrap()
}
}
64 changes: 56 additions & 8 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use node_data::ledger::{
use node_data::message::AsyncQueue;
use node_data::message::Payload;

use core::panic;
use dusk_consensus::operations::Voter;
use execution_core::stake::{Withdraw, STAKE_CONTRACT};
use metrics::{counter, gauge, histogram};
Expand Down Expand Up @@ -60,7 +61,7 @@ pub(crate) enum RevertTarget {
/// Acceptor also manages the initialization and lifespan of Consensus task.
pub(crate) struct Acceptor<N: Network, DB: database::DB, VM: vm::VMExecution> {
/// The tip
tip: RwLock<BlockWithLabel>,
pub(crate) tip: RwLock<BlockWithLabel>,

/// Provisioners needed to verify next block
pub(crate) provisioners_list: RwLock<ContextProvisioners>,
Expand All @@ -70,7 +71,7 @@ pub(crate) struct Acceptor<N: Network, DB: database::DB, VM: vm::VMExecution> {

pub(crate) db: Arc<RwLock<DB>>,
pub(crate) vm: Arc<RwLock<VM>>,
network: Arc<RwLock<N>>,
pub(crate) network: Arc<RwLock<N>>,

event_sender: Sender<Event>,
}
Expand Down Expand Up @@ -833,7 +834,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// The blockchain tip after reverting
let (blk, (_, label)) = self.db.read().await.update(|t| {
let mut height = curr_height;
while height != 0 {
loop {
let b = Ledger::fetch_block_by_height(t, height)?
.ok_or_else(|| anyhow::anyhow!("could not fetch block"))?;
let h = b.header();
Expand All @@ -846,6 +847,11 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
return Ok((b, label));
}

// the target_state_hash could not be found
if height == 0 {
panic!("revert to genesis block failed");
}

info!(
event = "block deleted",
height = h.height,
Expand All @@ -870,8 +876,6 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {

height -= 1;
}

Err(anyhow!("not found"))
})?;

if blk.header().state_hash != target_state_hash {
Expand Down Expand Up @@ -931,7 +935,8 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
}

pub(crate) async fn get_latest_final_block(&self) -> Result<Block> {
let tip = self.tip.read().await;
let tip: tokio::sync::RwLockReadGuard<'_, BlockWithLabel> =
self.tip.read().await;
if tip.is_final() {
return Ok(tip.inner().clone());
}
Expand Down Expand Up @@ -960,8 +965,8 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
Ok(final_block)
}

pub(crate) async fn get_curr_iteration(&self) -> u8 {
self.tip.read().await.inner().header().iteration
pub(crate) async fn get_curr_tip(&self) -> BlockWithLabel {
self.tip.read().await.clone()
}

pub(crate) async fn get_result_chan(
Expand Down Expand Up @@ -1068,6 +1073,49 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {

histogram!("dusk_block_disk_size").record(block_size_on_disk as f64);
}

/// Verifies if a block with header `local` can be replaced with a block
/// with header `new`
pub(crate) async fn verify_header_against_local(
&self,
local: &ledger::Header,
new: &ledger::Header,
) -> Result<()> {
let prev_header = self.db.read().await.view(|t| {
let prev_hash = &local.prev_block_hash;
t.fetch_block_header(prev_hash)?.ok_or(anyhow::anyhow!(
"Unable to find block with hash {}",
to_str(prev_hash)
))
})?;

let provisioners_list = self
.vm
.read()
.await
.get_provisioners(prev_header.state_hash)?;

let mut provisioners_list = ContextProvisioners::new(provisioners_list);

let changed_provisioners = self
.vm
.read()
.await
.get_changed_provisioners(prev_header.state_hash)?;
provisioners_list.apply_changes(changed_provisioners);

// Ensure header of the new block is valid according to prev_block
// header
let _ = verify_block_header(
self.db.clone(),
&prev_header,
&provisioners_list,
new,
)
.await?;

Ok(())
}
}

async fn broadcast<N: Network>(network: &Arc<RwLock<N>>, msg: &Message) {
Expand Down
55 changes: 4 additions & 51 deletions node/src/chain/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use anyhow::{anyhow, Result};
use dusk_consensus::user::provisioners::ContextProvisioners;
use node_data::ledger::{to_str, Header};
use node_data::ledger::Header;
use std::cmp::Ordering;
use tracing::info;

use crate::{
chain::acceptor,
database::{self, Ledger},
database::{self},
vm, Network,
};

Expand Down Expand Up @@ -42,17 +40,6 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution>
local: &Header,
remote: &Header,
revert_target: RevertTarget,
) -> Result<()> {
self.verify_header(local, remote).await?;
self.acc.try_revert(revert_target).await
}

/// Verifies if a block with header `local` can be replaced with a block
/// with header `remote`
async fn verify_header(
&self,
local: &Header,
remote: &Header,
) -> Result<()> {
match (local.height, remote.iteration.cmp(&local.iteration)) {
(0, _) => Err(anyhow!("cannot fallback over genesis block")),
Expand All @@ -68,48 +55,14 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution>
_ => Ok(()),
}?;

let prev_header = self.acc.db.read().await.view(|t| {
let prev_hash = &local.prev_block_hash;
t.fetch_block_header(prev_hash)?.ok_or(anyhow::anyhow!(
"Unable to find block with hash {}",
to_str(prev_hash)
))
})?;

info!(
event = "execute fallback checks",
height = local.height,
iter = local.iteration,
target_iter = remote.iteration,
);

let provisioners_list = self
.acc
.vm
.read()
.await
.get_provisioners(prev_header.state_hash)?;

let mut provisioners_list = ContextProvisioners::new(provisioners_list);

let changed_provisioners = self
.acc
.vm
.read()
.await
.get_changed_provisioners(prev_header.state_hash)?;
provisioners_list.apply_changes(changed_provisioners);

// Ensure header of the new block is valid according to prev_block
// header
let _ = acceptor::verify_block_header(
self.acc.db.clone(),
&prev_header,
&provisioners_list,
remote,
)
.await?;

Ok(())
self.acc.verify_header_against_local(local, remote).await?;
self.acc.try_revert(revert_target).await
}
}
Loading

0 comments on commit 75293b2

Please sign in to comment.