Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: Implement a FSM for stalled_chain states #2216

Merged
merged 13 commits into from
Sep 10, 2024
Merged
35 changes: 5 additions & 30 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod genesis;

mod header_validation;
mod metrics;
mod stall_chain_fsm;

use self::acceptor::Acceptor;
use self::fsm::SimpleFSM;
Expand Down Expand Up @@ -44,7 +45,6 @@ const TOPICS: &[u8] = &[
Topics::Quorum as u8,
];

const ACCEPT_BLOCK_TIMEOUT_SEC: Duration = Duration::from_secs(20);
const HEARTBEAT_SEC: Duration = Duration::from_secs(1);

pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
Expand Down Expand Up @@ -110,14 +110,11 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
acc.write().await.spawn_task().await;

// Start-up FSM instance
let mut fsm = SimpleFSM::new(acc.clone(), network.clone());
let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await;

let outbound_chan = acc.read().await.get_outbound_chan().await;
let result_chan = acc.read().await.get_result_chan().await;

// Accept_Block timeout is activated when a node is unable to accept a
// valid block within a specified time frame.
let mut timeout = Self::next_timeout();
let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();

// Message loop for Chain context
Expand Down Expand Up @@ -156,10 +153,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
// By disabling block broadcast, a block may be received from a peer
// only after explicit request (on demand).
match fsm.on_block_event(*blk, msg.metadata).await {
Ok(None) => {}
Ok(Some(_)) => {
timeout = Self::next_timeout();
}
Ok(_) => {}
Err(err) => {
error!(event = "fsm::on_event failed", src = "wire", err = ?err);
}
Expand All @@ -181,11 +175,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
warn!("msg discarded: {e}");
}
match fsm.on_quorum_msg(payload, &msg).await {
Ok(None) => {}
Ok(Some(_)) => {
// block accepted, timeout reset
timeout = Self::next_timeout();
}
Ok(_) => {}
Err(err) => {
warn!(event = "quorum msg", ?err);
}
Expand All @@ -203,11 +193,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
// the winner block will be compiled and redirected to the Acceptor.
if let Payload::Quorum(quorum) = &msg.payload {
match fsm.on_quorum_msg(quorum, &msg).await {
Ok(None) => {}
Ok(Some(_)) => {
// block accepted, timeout reset
timeout = Self::next_timeout();
}
Ok(_) => {}
Err(err) => {
warn!(event = "handle quorum msg from internal consensus failed", ?err);
}
Expand All @@ -217,11 +203,6 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
if let Err(e) = network.read().await.broadcast(&msg).await {
warn!("Unable to re-route message {e}");
}
},
// Handles accept_block_timeout event
_ = sleep_until(timeout) => {
fsm.on_idle(ACCEPT_BLOCK_TIMEOUT_SEC).await;
timeout = Self::next_timeout();
},
// Handles heartbeat event
_ = sleep_until(heartbeat) => {
Expand Down Expand Up @@ -315,10 +296,4 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {

Ok(block)
}

fn next_timeout() -> Instant {
Instant::now()
.checked_add(ACCEPT_BLOCK_TIMEOUT_SEC)
.unwrap()
}
}
64 changes: 56 additions & 8 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use node_data::ledger::{
use node_data::message::AsyncQueue;
use node_data::message::Payload;

use core::panic;
use dusk_consensus::operations::Voter;
use execution_core::stake::{Withdraw, STAKE_CONTRACT};
use metrics::{counter, gauge, histogram};
Expand Down Expand Up @@ -60,7 +61,7 @@ pub(crate) enum RevertTarget {
/// Acceptor also manages the initialization and lifespan of Consensus task.
pub(crate) struct Acceptor<N: Network, DB: database::DB, VM: vm::VMExecution> {
/// The tip
tip: RwLock<BlockWithLabel>,
pub(crate) tip: RwLock<BlockWithLabel>,

/// Provisioners needed to verify next block
pub(crate) provisioners_list: RwLock<ContextProvisioners>,
Expand All @@ -70,7 +71,7 @@ pub(crate) struct Acceptor<N: Network, DB: database::DB, VM: vm::VMExecution> {

pub(crate) db: Arc<RwLock<DB>>,
pub(crate) vm: Arc<RwLock<VM>>,
network: Arc<RwLock<N>>,
pub(crate) network: Arc<RwLock<N>>,

event_sender: Sender<Event>,
}
Expand Down Expand Up @@ -833,7 +834,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
// The blockchain tip after reverting
let (blk, (_, label)) = self.db.read().await.update(|t| {
let mut height = curr_height;
while height != 0 {
loop {
let b = Ledger::fetch_block_by_height(t, height)?
.ok_or_else(|| anyhow::anyhow!("could not fetch block"))?;
let h = b.header();
Expand All @@ -846,6 +847,11 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
return Ok((b, label));
}

// the target_state_hash could not be found
if height == 0 {
panic!("revert to genesis block failed");
}

info!(
event = "block deleted",
height = h.height,
Expand All @@ -870,8 +876,6 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {

height -= 1;
}

Err(anyhow!("not found"))
})?;

if blk.header().state_hash != target_state_hash {
Expand Down Expand Up @@ -931,7 +935,8 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
}

pub(crate) async fn get_latest_final_block(&self) -> Result<Block> {
let tip = self.tip.read().await;
let tip: tokio::sync::RwLockReadGuard<'_, BlockWithLabel> =
self.tip.read().await;
if tip.is_final() {
return Ok(tip.inner().clone());
}
Expand Down Expand Up @@ -960,8 +965,8 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
Ok(final_block)
}

pub(crate) async fn get_curr_iteration(&self) -> u8 {
self.tip.read().await.inner().header().iteration
pub(crate) async fn get_curr_tip(&self) -> BlockWithLabel {
self.tip.read().await.clone()
}

pub(crate) async fn get_result_chan(
Expand Down Expand Up @@ -1068,6 +1073,49 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {

histogram!("dusk_block_disk_size").record(block_size_on_disk as f64);
}

/// Verifies if a block with header `local` can be replaced with a block
/// with header `new`
pub(crate) async fn verify_header_against_local(
&self,
local: &ledger::Header,
new: &ledger::Header,
) -> Result<()> {
let prev_header = self.db.read().await.view(|t| {
let prev_hash = &local.prev_block_hash;
t.fetch_block_header(prev_hash)?.ok_or(anyhow::anyhow!(
"Unable to find block with hash {}",
to_str(prev_hash)
))
})?;

let provisioners_list = self
.vm
.read()
.await
.get_provisioners(prev_header.state_hash)?;

let mut provisioners_list = ContextProvisioners::new(provisioners_list);

let changed_provisioners = self
.vm
.read()
.await
.get_changed_provisioners(prev_header.state_hash)?;
provisioners_list.apply_changes(changed_provisioners);

// Ensure header of the new block is valid according to prev_block
// header
let _ = verify_block_header(
self.db.clone(),
&prev_header,
&provisioners_list,
new,
)
.await?;

Ok(())
}
}

async fn broadcast<N: Network>(network: &Arc<RwLock<N>>, msg: &Message) {
Expand Down
55 changes: 4 additions & 51 deletions node/src/chain/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use anyhow::{anyhow, Result};
use dusk_consensus::user::provisioners::ContextProvisioners;
use node_data::ledger::{to_str, Header};
use node_data::ledger::Header;
use std::cmp::Ordering;
use tracing::info;

use crate::{
chain::acceptor,
database::{self, Ledger},
database::{self},
vm, Network,
};

Expand Down Expand Up @@ -42,17 +40,6 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution>
local: &Header,
remote: &Header,
revert_target: RevertTarget,
) -> Result<()> {
self.verify_header(local, remote).await?;
self.acc.try_revert(revert_target).await
}

/// Verifies if a block with header `local` can be replaced with a block
/// with header `remote`
async fn verify_header(
&self,
local: &Header,
remote: &Header,
) -> Result<()> {
match (local.height, remote.iteration.cmp(&local.iteration)) {
(0, _) => Err(anyhow!("cannot fallback over genesis block")),
Expand All @@ -68,48 +55,14 @@ impl<'a, N: Network, DB: database::DB, VM: vm::VMExecution>
_ => Ok(()),
}?;

let prev_header = self.acc.db.read().await.view(|t| {
let prev_hash = &local.prev_block_hash;
t.fetch_block_header(prev_hash)?.ok_or(anyhow::anyhow!(
"Unable to find block with hash {}",
to_str(prev_hash)
))
})?;

info!(
event = "execute fallback checks",
height = local.height,
iter = local.iteration,
target_iter = remote.iteration,
);

let provisioners_list = self
.acc
.vm
.read()
.await
.get_provisioners(prev_header.state_hash)?;

let mut provisioners_list = ContextProvisioners::new(provisioners_list);

let changed_provisioners = self
.acc
.vm
.read()
.await
.get_changed_provisioners(prev_header.state_hash)?;
provisioners_list.apply_changes(changed_provisioners);

// Ensure header of the new block is valid according to prev_block
// header
let _ = acceptor::verify_block_header(
self.acc.db.clone(),
&prev_header,
&provisioners_list,
remote,
)
.await?;

Ok(())
self.acc.verify_header_against_local(local, remote).await?;
self.acc.try_revert(revert_target).await
}
}
Loading
Loading